X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FRemoteTransactionContextSupport.java;h=c1162c2d9385f0b40e644adbf09cfba1a83c07a2;hb=9b235df8e0b4d8c4c7419419538188cdf7b2bfc2;hp=59205692d119ffc9183a7c505f39c43008ea28db;hpb=380b9ee20a2032cd223e66dded8204519540803f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java index 59205692d1..c1162c2d93 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java @@ -10,13 +10,16 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.OnComplete; +import akka.pattern.AskTimeoutException; +import akka.util.Timeout; import com.google.common.base.Preconditions; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +30,7 @@ import scala.concurrent.duration.FiniteDuration; * Handles creation of TransactionContext instances for remote transactions. This class creates * remote transactions, if necessary, by sending CreateTransaction messages with retries, up to a limit, * if the shard doesn't have a leader yet. This is done by scheduling a retry task after a short delay. - *
+ *
* The end result from a completed CreateTransaction message is a TransactionContext that is * used to perform transaction operations. Transaction operations that occur before the * CreateTransaction completes are cache via a TransactionContextWrapper and executed once the @@ -36,10 +39,8 @@ import scala.concurrent.duration.FiniteDuration; final class RemoteTransactionContextSupport { private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContextSupport.class); - /** - * Time interval in between transaction create retries. - */ - private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS); + private static final long CREATE_TX_TRY_INTERVAL_IN_MS = 1000; + private static final long MAX_CREATE_TX_MSG_TIMEOUT_IN_MS = 5000; private final TransactionProxy parent; private final String shardName; @@ -47,19 +48,34 @@ final class RemoteTransactionContextSupport { /** * The target primary shard. */ - private volatile ActorSelection primaryShard; - private volatile int createTxTries; + private volatile PrimaryShardInfo primaryShardInfo; + + /** + * The total timeout for creating a tx on the primary shard. + */ + private volatile long totalCreateTxTimeout; + + private final Timeout createTxMessageTimeout; private final TransactionContextWrapper transactionContextWrapper; - RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextWrapper, final TransactionProxy parent, - final String shardName) { + RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextWrapper, + final TransactionProxy parent, final String shardName) { this.parent = Preconditions.checkNotNull(parent); this.shardName = shardName; this.transactionContextWrapper = transactionContextWrapper; - createTxTries = (int) (parent.getActorContext().getDatastoreContext(). - getShardLeaderElectionTimeout().duration().toMillis() / - CREATE_TX_TRY_INTERVAL.toMillis()); + + // For the total create tx timeout, use 2 times the election timeout. This should be enough time for + // a leader re-election to occur if we happen to hit it in transition. + totalCreateTxTimeout = parent.getActorContext().getDatastoreContext().getShardRaftConfig() + .getElectionTimeOutInterval().toMillis() * 2; + + // We'll use the operationTimeout for the the create Tx message timeout so it can be set appropriately + // for unit tests but cap it at MAX_CREATE_TX_MSG_TIMEOUT_IN_MS. The operationTimeout could be set + // larger than the totalCreateTxTimeout in production which we don't want. + long operationTimeout = parent.getActorContext().getOperationTimeout().duration().toMillis(); + createTxMessageTimeout = new Timeout(Math.min(operationTimeout, MAX_CREATE_TX_MSG_TIMEOUT_IN_MS), + TimeUnit.MILLISECONDS); } String getShardName() { @@ -74,10 +90,6 @@ final class RemoteTransactionContextSupport { return parent.getActorContext(); } - private OperationLimiter getOperationLimiter() { - return transactionContextWrapper.getLimiter(); - } - private TransactionIdentifier getIdentifier() { return parent.getIdentifier(); } @@ -85,69 +97,104 @@ final class RemoteTransactionContextSupport { /** * Sets the target primary shard and initiates a CreateTransaction try. */ - void setPrimaryShard(ActorSelection primaryShard, short primaryVersion) { - this.primaryShard = primaryShard; + void setPrimaryShard(final PrimaryShardInfo newPrimaryShardInfo) { + this.primaryShardInfo = newPrimaryShardInfo; + + if (getTransactionType() == TransactionType.WRITE_ONLY + && getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { + ActorSelection primaryShard = newPrimaryShardInfo.getPrimaryShardActor(); - if (getTransactionType() == TransactionType.WRITE_ONLY && primaryVersion >= DataStoreVersions.LITHIUM_VERSION && - getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", getIdentifier(), primaryShard); // For write-only Tx's we prepare the transaction modifications directly on the shard actor // to avoid the overhead of creating a separate transaction actor. - transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext(this.primaryShard, - this.primaryShard.path().toString(), primaryVersion)); + transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext( + primaryShard, String.valueOf(primaryShard.path()), newPrimaryShardInfo.getPrimaryShardVersion())); } else { tryCreateTransaction(); } } /** - * Performs a CreateTransaction try async. + Performs a CreateTransaction try async. */ private void tryCreateTransaction() { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard); - } + LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), + primaryShardInfo.getPrimaryShardActor()); - Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(), - getTransactionType().ordinal(), getIdentifier().getChainId()).toSerializable(); + Object serializedCreateMessage = new CreateTransaction(getIdentifier(), getTransactionType().ordinal(), + primaryShardInfo.getPrimaryShardVersion()).toSerializable(); - Future