X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=951bc22545804af11bbc37fd8ef28d03cd53fc02;hp=19fa26682e2a4cea7b637fda85064a3aea0226e5;hb=a23ab6d60b7b57184a8fe59e282e46b448c86d6a;hpb=fcf65d723ef53f8da2dd6347f41ce19016fc36e5 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 19fa26682e..951bc22545 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -20,7 +20,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Coordinates commits for a shard ensuring only one concurrent 3-phase commit. @@ -29,27 +28,36 @@ import org.slf4j.LoggerFactory; */ public class ShardCommitCoordinator { - private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class); - private final Cache cohortCache; private CohortEntry currentCohortEntry; private final Queue queuedCohortEntries; - private final int queueCapacity; + private int queueCapacity; + + private final Logger log; + + private final String name; - public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) { + public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log, + String name) { cohortCache = CacheBuilder.newBuilder().expireAfterAccess( cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build(); this.queueCapacity = queueCapacity; + this.log = log; + this.name = name; // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls // since this should only be accessed on the shard's dispatcher. queuedCohortEntries = new LinkedList<>(); } + public void setQueueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; + } + /** * This method caches a cohort entry for the given transactions ID in preparation for the * subsequent 3-phase commit. @@ -74,9 +82,9 @@ public class ShardCommitCoordinator { public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender, final ActorRef shard) { String transactionID = canCommit.getTransactionID(); - if(LOG.isDebugEnabled()) { - LOG.debug("Processing canCommit for transaction {} for shard {}", - transactionID, shard.path()); + if(log.isDebugEnabled()) { + log.debug("{}: Processing canCommit for transaction {} for shard {}", + name, transactionID, shard.path()); } // Lookup the cohort entry that was cached previously (or should have been) by @@ -86,8 +94,8 @@ public class ShardCommitCoordinator { // Either canCommit was invoked before ready(shouldn't happen) or a long time passed // between canCommit and ready and the entry was expired from the cache. IllegalStateException ex = new IllegalStateException( - String.format("No cohort entry found for transaction %s", transactionID)); - LOG.error(ex.getMessage()); + String.format("%s: No cohort entry found for transaction %s", name, transactionID)); + log.error(ex.getMessage()); sender.tell(new Status.Failure(ex), shard); return; } @@ -98,8 +106,8 @@ public class ShardCommitCoordinator { if(currentCohortEntry != null) { // There's already a Tx commit in progress - attempt to queue this entry to be // committed after the current Tx completes. - LOG.debug("Transaction {} is already in progress - queueing transaction {}", - currentCohortEntry.getTransactionID(), transactionID); + log.debug("{}: Transaction {} is already in progress - queueing transaction {}", + name, currentCohortEntry.getTransactionID(), transactionID); if(queuedCohortEntries.size() < queueCapacity) { queuedCohortEntries.offer(cohortEntry); @@ -107,10 +115,10 @@ public class ShardCommitCoordinator { removeCohortEntry(transactionID); RuntimeException ex = new RuntimeException( - String.format("Could not enqueue transaction %s - the maximum commit queue"+ + String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+ " capacity %d has been reached.", - transactionID, queueCapacity)); - LOG.error(ex.getMessage()); + name, transactionID, queueCapacity)); + log.error(ex.getMessage()); sender.tell(new Status.Failure(ex), shard); } } else { @@ -140,7 +148,7 @@ public class ShardCommitCoordinator { removeCohortEntry(cohortEntry.getTransactionID()); } } catch (InterruptedException | ExecutionException e) { - LOG.debug("An exception occurred during canCommit", e); + log.debug("{}: An exception occurred during canCommit: {}", name, e); // Remove the entry from the cache now since the Tx will be aborted. removeCohortEntry(cohortEntry.getTransactionID()); @@ -201,6 +209,7 @@ public class ShardCommitCoordinator { // Dequeue the next cohort entry waiting in the queue. currentCohortEntry = queuedCohortEntries.poll(); if(currentCohortEntry != null) { + currentCohortEntry.updateLastAccessTime(); doCanCommit(currentCohortEntry); } }