X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=659acb745473af389480ff08e19cc68857add93a;hp=f3b4e416403b0594a22da9ff47de2f68a1d284cc;hb=58c031c8fbec9f8d7e05eac60c1bea168d3471f6;hpb=b3c034675957f963c5878ce1e5e183ec2de8b5e2 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index f3b4e41640..659acb7454 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -7,6 +7,11 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; +import akka.actor.Status; +import akka.event.LoggingAdapter; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ExecutionException; @@ -15,12 +20,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import akka.actor.ActorRef; -import akka.actor.Status; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; /** * Coordinates commits for a shard ensuring only one concurrent 3-phase commit. @@ -29,14 +28,6 @@ import com.google.common.cache.CacheBuilder; */ public class ShardCommitCoordinator { - private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class); - - private static final Object CAN_COMMIT_REPLY_TRUE = - new CanCommitTransactionReply(Boolean.TRUE).toSerializable(); - - private static final Object CAN_COMMIT_REPLY_FALSE = - new CanCommitTransactionReply(Boolean.FALSE).toSerializable(); - private final Cache cohortCache; private CohortEntry currentCohortEntry; @@ -45,11 +36,18 @@ public class ShardCommitCoordinator { private final int queueCapacity; - public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) { + private final LoggingAdapter log; + + private final String name; + + public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, LoggingAdapter log, + String name) { cohortCache = CacheBuilder.newBuilder().expireAfterAccess( cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build(); this.queueCapacity = queueCapacity; + this.log = log; + this.name = name; // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls // since this should only be accessed on the shard's dispatcher. @@ -80,9 +78,9 @@ public class ShardCommitCoordinator { public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender, final ActorRef shard) { String transactionID = canCommit.getTransactionID(); - if(LOG.isDebugEnabled()) { - LOG.debug("Processing canCommit for transaction {} for shard {}", - transactionID, shard.path()); + if(log.isDebugEnabled()) { + log.debug("{}: Processing canCommit for transaction {} for shard {}", + name, transactionID, shard.path()); } // Lookup the cohort entry that was cached previously (or should have been) by @@ -92,8 +90,8 @@ public class ShardCommitCoordinator { // Either canCommit was invoked before ready(shouldn't happen) or a long time passed // between canCommit and ready and the entry was expired from the cache. IllegalStateException ex = new IllegalStateException( - String.format("No cohort entry found for transaction %s", transactionID)); - LOG.error(ex.getMessage()); + String.format("%s: No cohort entry found for transaction %s", name, transactionID)); + log.error(ex.getMessage()); sender.tell(new Status.Failure(ex), shard); return; } @@ -104,8 +102,8 @@ public class ShardCommitCoordinator { if(currentCohortEntry != null) { // There's already a Tx commit in progress - attempt to queue this entry to be // committed after the current Tx completes. - LOG.debug("Transaction {} is already in progress - queueing transaction {}", - currentCohortEntry.getTransactionID(), transactionID); + log.debug("{}: Transaction {} is already in progress - queueing transaction {}", + name, currentCohortEntry.getTransactionID(), transactionID); if(queuedCohortEntries.size() < queueCapacity) { queuedCohortEntries.offer(cohortEntry); @@ -113,10 +111,10 @@ public class ShardCommitCoordinator { removeCohortEntry(transactionID); RuntimeException ex = new RuntimeException( - String.format("Could not enqueue transaction %s - the maximum commit queue"+ + String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+ " capacity %d has been reached.", - transactionID, queueCapacity)); - LOG.error(ex.getMessage()); + name, transactionID, queueCapacity)); + log.error(ex.getMessage()); sender.tell(new Status.Failure(ex), shard); } } else { @@ -138,14 +136,15 @@ public class ShardCommitCoordinator { Boolean canCommit = cohortEntry.getCohort().canCommit().get(); cohortEntry.getCanCommitSender().tell( - canCommit ? CAN_COMMIT_REPLY_TRUE : CAN_COMMIT_REPLY_FALSE, cohortEntry.getShard()); + canCommit ? CanCommitTransactionReply.YES.toSerializable() : + CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard()); if(!canCommit) { // Remove the entry from the cache now since the Tx will be aborted. removeCohortEntry(cohortEntry.getTransactionID()); } } catch (InterruptedException | ExecutionException e) { - LOG.debug("An exception occurred during canCommit", e); + log.debug("{}: An exception occurred during canCommit: {}", name, e); // Remove the entry from the cache now since the Tx will be aborted. removeCohortEntry(cohortEntry.getTransactionID());