X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=5d0ca38d6a2f1a1398161c7e73246a47f7b216f6;hp=f3b4e416403b0594a22da9ff47de2f68a1d284cc;hb=43f89a73d733c3c43a875b3724b5a68470894450;hpb=b3c034675957f963c5878ce1e5e183ec2de8b5e2 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index f3b4e41640..5d0ca38d6a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -7,20 +7,20 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; +import akka.actor.Status; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.modification.CompositeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import akka.actor.ActorRef; -import akka.actor.Status; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; /** * Coordinates commits for a shard ensuring only one concurrent 3-phase commit. @@ -29,33 +29,36 @@ import com.google.common.cache.CacheBuilder; */ public class ShardCommitCoordinator { - private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class); - - private static final Object CAN_COMMIT_REPLY_TRUE = - new CanCommitTransactionReply(Boolean.TRUE).toSerializable(); - - private static final Object CAN_COMMIT_REPLY_FALSE = - new CanCommitTransactionReply(Boolean.FALSE).toSerializable(); - private final Cache cohortCache; private CohortEntry currentCohortEntry; private final Queue queuedCohortEntries; - private final int queueCapacity; + private int queueCapacity; + + private final Logger log; + + private final String name; - public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) { + public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log, + String name) { cohortCache = CacheBuilder.newBuilder().expireAfterAccess( cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build(); this.queueCapacity = queueCapacity; + this.log = log; + this.name = name; // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls // since this should only be accessed on the shard's dispatcher. queuedCohortEntries = new LinkedList<>(); } + public void setQueueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; + } + /** * This method caches a cohort entry for the given transactions ID in preparation for the * subsequent 3-phase commit. @@ -80,9 +83,9 @@ public class ShardCommitCoordinator { public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender, final ActorRef shard) { String transactionID = canCommit.getTransactionID(); - if(LOG.isDebugEnabled()) { - LOG.debug("Processing canCommit for transaction {} for shard {}", - transactionID, shard.path()); + if(log.isDebugEnabled()) { + log.debug("{}: Processing canCommit for transaction {} for shard {}", + name, transactionID, shard.path()); } // Lookup the cohort entry that was cached previously (or should have been) by @@ -92,8 +95,8 @@ public class ShardCommitCoordinator { // Either canCommit was invoked before ready(shouldn't happen) or a long time passed // between canCommit and ready and the entry was expired from the cache. IllegalStateException ex = new IllegalStateException( - String.format("No cohort entry found for transaction %s", transactionID)); - LOG.error(ex.getMessage()); + String.format("%s: No cohort entry found for transaction %s", name, transactionID)); + log.error(ex.getMessage()); sender.tell(new Status.Failure(ex), shard); return; } @@ -104,8 +107,8 @@ public class ShardCommitCoordinator { if(currentCohortEntry != null) { // There's already a Tx commit in progress - attempt to queue this entry to be // committed after the current Tx completes. - LOG.debug("Transaction {} is already in progress - queueing transaction {}", - currentCohortEntry.getTransactionID(), transactionID); + log.debug("{}: Transaction {} is already in progress - queueing transaction {}", + name, currentCohortEntry.getTransactionID(), transactionID); if(queuedCohortEntries.size() < queueCapacity) { queuedCohortEntries.offer(cohortEntry); @@ -113,10 +116,10 @@ public class ShardCommitCoordinator { removeCohortEntry(transactionID); RuntimeException ex = new RuntimeException( - String.format("Could not enqueue transaction %s - the maximum commit queue"+ + String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+ " capacity %d has been reached.", - transactionID, queueCapacity)); - LOG.error(ex.getMessage()); + name, transactionID, queueCapacity)); + log.error(ex.getMessage()); sender.tell(new Status.Failure(ex), shard); } } else { @@ -138,14 +141,15 @@ public class ShardCommitCoordinator { Boolean canCommit = cohortEntry.getCohort().canCommit().get(); cohortEntry.getCanCommitSender().tell( - canCommit ? CAN_COMMIT_REPLY_TRUE : CAN_COMMIT_REPLY_FALSE, cohortEntry.getShard()); + canCommit ? CanCommitTransactionReply.YES.toSerializable() : + CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard()); if(!canCommit) { // Remove the entry from the cache now since the Tx will be aborted. removeCohortEntry(cohortEntry.getTransactionID()); } } catch (InterruptedException | ExecutionException e) { - LOG.debug("An exception occurred during canCommit", e); + log.debug("{}: An exception occurred during canCommit: {}", name, e); // Remove the entry from the cache now since the Tx will be aborted. removeCohortEntry(cohortEntry.getTransactionID()); @@ -206,6 +210,7 @@ public class ShardCommitCoordinator { // Dequeue the next cohort entry waiting in the queue. currentCohortEntry = queuedCohortEntries.poll(); if(currentCohortEntry != null) { + currentCohortEntry.updateLastAccessTime(); doCanCommit(currentCohortEntry); } } @@ -261,5 +266,12 @@ public class ShardCommitCoordinator { void setShard(ActorRef shard) { this.shard = shard; } + + boolean hasModifications(){ + if(modification instanceof CompositeModification){ + return ((CompositeModification) modification).getModifications().size() > 0; + } + return true; + } } }