X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=54f15fcb4bd03115d97eccc72166b0b12efbca2f;hp=f3b4e416403b0594a22da9ff47de2f68a1d284cc;hb=4349b034606957d3e876b82b14a292e6739a986a;hpb=43f5c8e686ccb3a5b196d3d64721cb4ec86ee3d1 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index f3b4e41640..54f15fcb4b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -7,20 +7,29 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; +import akka.actor.Status; +import akka.serialization.Serialization; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalCause; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import akka.actor.ActorRef; -import akka.actor.Status; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; /** * Coordinates commits for a shard ensuring only one concurrent 3-phase commit. @@ -29,47 +38,119 @@ import com.google.common.cache.CacheBuilder; */ public class ShardCommitCoordinator { - private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class); - - private static final Object CAN_COMMIT_REPLY_TRUE = - new CanCommitTransactionReply(Boolean.TRUE).toSerializable(); - - private static final Object CAN_COMMIT_REPLY_FALSE = - new CanCommitTransactionReply(Boolean.FALSE).toSerializable(); + // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. + public interface CohortDecorator { + DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual); + } private final Cache cohortCache; private CohortEntry currentCohortEntry; + private final DOMTransactionFactory transactionFactory; + private final Queue queuedCohortEntries; - private final int queueCapacity; + private int queueCapacity; + + private final Logger log; + + private final String name; - public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) { - cohortCache = CacheBuilder.newBuilder().expireAfterAccess( - cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build(); + private final String shardActorPath; + + private final RemovalListener cacheRemovalListener = + new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + if(notification.getCause() == RemovalCause.EXPIRED) { + log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey()); + } + } + }; + + // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. + private CohortDecorator cohortDecorator; + + public ShardCommitCoordinator(DOMTransactionFactory transactionFactory, + long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) { this.queueCapacity = queueCapacity; + this.log = log; + this.name = name; + this.transactionFactory = transactionFactory; + + shardActorPath = Serialization.serializedActorPath(shardActor); + + cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS). + removalListener(cacheRemovalListener).build(); // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls // since this should only be accessed on the shard's dispatcher. queuedCohortEntries = new LinkedList<>(); } + public void setQueueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; + } + /** - * This method caches a cohort entry for the given transactions ID in preparation for the - * subsequent 3-phase commit. + * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches + * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit. * * @param transactionID the ID of the transaction * @param cohort the cohort to participate in the transaction commit - * @param modification the modification made by the transaction + * @param modification the modifications made by the transaction */ public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort, - Modification modification) { + MutableCompositeModification modification) { cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification)); } + /** + * This method handles a BatchedModifications message for a transaction being prepared directly on the + * Shard actor instead of via a ShardTransaction actor. If there's no currently cached + * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If + * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created. + * + * @param batched the BatchedModifications + * @param shardActor the transaction's shard actor + * + * @throws ExecutionException if an error occurs loading the cache + */ + public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched) + throws ExecutionException { + CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID()); + if(cohortEntry == null) { + cohortEntry = new CohortEntry(batched.getTransactionID(), + transactionFactory.newTransaction( + TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(), + batched.getTransactionChainID())); + cohortCache.put(batched.getTransactionID(), cohortEntry); + } + + if(log.isDebugEnabled()) { + log.debug("{}: Applying {} batched modifications for Tx {}", name, + batched.getModifications().size(), batched.getTransactionID()); + } + + cohortEntry.applyModifications(batched.getModifications()); + + String cohortPath = null; + if(batched.isReady()) { + if(log.isDebugEnabled()) { + log.debug("{}: Readying Tx {}, client version {}", name, + batched.getTransactionID(), batched.getVersion()); + } + + cohortEntry.ready(cohortDecorator); + cohortPath = shardActorPath; + } + + return new BatchedModificationsReply(batched.getModifications().size(), cohortPath); + } + /** * This method handles the canCommit phase for a transaction. * @@ -80,9 +161,9 @@ public class ShardCommitCoordinator { public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender, final ActorRef shard) { String transactionID = canCommit.getTransactionID(); - if(LOG.isDebugEnabled()) { - LOG.debug("Processing canCommit for transaction {} for shard {}", - transactionID, shard.path()); + if(log.isDebugEnabled()) { + log.debug("{}: Processing canCommit for transaction {} for shard {}", + name, transactionID, shard.path()); } // Lookup the cohort entry that was cached previously (or should have been) by @@ -92,8 +173,8 @@ public class ShardCommitCoordinator { // Either canCommit was invoked before ready(shouldn't happen) or a long time passed // between canCommit and ready and the entry was expired from the cache. IllegalStateException ex = new IllegalStateException( - String.format("No cohort entry found for transaction %s", transactionID)); - LOG.error(ex.getMessage()); + String.format("%s: No cohort entry found for transaction %s", name, transactionID)); + log.error(ex.getMessage()); sender.tell(new Status.Failure(ex), shard); return; } @@ -104,8 +185,8 @@ public class ShardCommitCoordinator { if(currentCohortEntry != null) { // There's already a Tx commit in progress - attempt to queue this entry to be // committed after the current Tx completes. - LOG.debug("Transaction {} is already in progress - queueing transaction {}", - currentCohortEntry.getTransactionID(), transactionID); + log.debug("{}: Transaction {} is already in progress - queueing transaction {}", + name, currentCohortEntry.getTransactionID(), transactionID); if(queuedCohortEntries.size() < queueCapacity) { queuedCohortEntries.offer(cohortEntry); @@ -113,10 +194,10 @@ public class ShardCommitCoordinator { removeCohortEntry(transactionID); RuntimeException ex = new RuntimeException( - String.format("Could not enqueue transaction %s - the maximum commit queue"+ + String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+ " capacity %d has been reached.", - transactionID, queueCapacity)); - LOG.error(ex.getMessage()); + name, transactionID, queueCapacity)); + log.error(ex.getMessage()); sender.tell(new Status.Failure(ex), shard); } } else { @@ -138,14 +219,15 @@ public class ShardCommitCoordinator { Boolean canCommit = cohortEntry.getCohort().canCommit().get(); cohortEntry.getCanCommitSender().tell( - canCommit ? CAN_COMMIT_REPLY_TRUE : CAN_COMMIT_REPLY_FALSE, cohortEntry.getShard()); + canCommit ? CanCommitTransactionReply.YES.toSerializable() : + CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard()); if(!canCommit) { // Remove the entry from the cache now since the Tx will be aborted. removeCohortEntry(cohortEntry.getTransactionID()); } } catch (InterruptedException | ExecutionException e) { - LOG.debug("An exception occurred during canCommit", e); + log.debug("{}: An exception occurred during canCommit: {}", name, e); // Remove the entry from the cache now since the Tx will be aborted. removeCohortEntry(cohortEntry.getTransactionID()); @@ -206,24 +288,39 @@ public class ShardCommitCoordinator { // Dequeue the next cohort entry waiting in the queue. currentCohortEntry = queuedCohortEntries.poll(); if(currentCohortEntry != null) { + currentCohortEntry.updateLastAccessTime(); doCanCommit(currentCohortEntry); } } } + @VisibleForTesting + void setCohortDecorator(CohortDecorator cohortDecorator) { + this.cohortDecorator = cohortDecorator; + } + + static class CohortEntry { private final String transactionID; - private final DOMStoreThreePhaseCommitCohort cohort; - private final Modification modification; + private DOMStoreThreePhaseCommitCohort cohort; + private final MutableCompositeModification compositeModification; + private final DOMStoreWriteTransaction transaction; private ActorRef canCommitSender; private ActorRef shard; private long lastAccessTime; + CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) { + this.compositeModification = new MutableCompositeModification(); + this.transaction = transaction; + this.transactionID = transactionID; + } + CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort, - Modification modification) { + MutableCompositeModification compositeModification) { this.transactionID = transactionID; this.cohort = cohort; - this.modification = modification; + this.compositeModification = compositeModification; + this.transaction = null; } void updateLastAccessTime() { @@ -242,8 +339,26 @@ public class ShardCommitCoordinator { return cohort; } - Modification getModification() { - return modification; + MutableCompositeModification getModification() { + return compositeModification; + } + + void applyModifications(Iterable modifications) { + for(Modification modification: modifications) { + compositeModification.addModification(modification); + modification.apply(transaction); + } + } + + void ready(CohortDecorator cohortDecorator) { + Preconditions.checkState(cohort == null, "cohort was already set"); + + cohort = transaction.ready(); + + if(cohortDecorator != null) { + // Call the hook for unit tests. + cohort = cohortDecorator.decorate(transactionID, cohort); + } } ActorRef getCanCommitSender() { @@ -261,5 +376,9 @@ public class ShardCommitCoordinator { void setShard(ActorRef shard) { this.shard = shard; } + + boolean hasModifications(){ + return compositeModification.getModifications().size() > 0; + } } }