X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FRemoteTransactionContextSupport.java;h=4276f3be52960e0ff9c68cd50adc5d9e591c4db6;hp=71d9dabf37afd6c5390d6231a4151aba21722dea;hb=14c92df74247c884a43c5aaea2f154992b0ec798;hpb=103ceecd0195cca6c87fbd62a687d8addf128784 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java index 71d9dabf37..4276f3be52 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java @@ -8,17 +8,21 @@ */ package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorSelection; import akka.dispatch.OnComplete; -import com.google.common.base.Preconditions; -import java.util.concurrent.Semaphore; +import akka.pattern.AskTimeoutException; +import akka.util.Timeout; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -28,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration; * Handles creation of TransactionContext instances for remote transactions. This class creates * remote transactions, if necessary, by sending CreateTransaction messages with retries, up to a limit, * if the shard doesn't have a leader yet. This is done by scheduling a retry task after a short delay. - *

+ *

* The end result from a completed CreateTransaction message is a TransactionContext that is * used to perform transaction operations. Transaction operations that occur before the * CreateTransaction completes are cache via a TransactionContextWrapper and executed once the @@ -37,10 +41,8 @@ import scala.concurrent.duration.FiniteDuration; final class RemoteTransactionContextSupport { private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContextSupport.class); - /** - * Time interval in between transaction create retries. - */ - private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS); + private static final long CREATE_TX_TRY_INTERVAL_IN_MS = 1000; + private static final long MAX_CREATE_TX_MSG_TIMEOUT_IN_MS = 5000; private final TransactionProxy parent; private final String shardName; @@ -48,19 +50,34 @@ final class RemoteTransactionContextSupport { /** * The target primary shard. */ - private volatile ActorSelection primaryShard; - private volatile int createTxTries; + private volatile PrimaryShardInfo primaryShardInfo; + + /** + * The total timeout for creating a tx on the primary shard. + */ + private volatile long totalCreateTxTimeout; - private final TransactionContextWrapper transactionContextAdapter; + private final Timeout createTxMessageTimeout; - RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextAdapter, final TransactionProxy parent, - final String shardName) { - this.parent = Preconditions.checkNotNull(parent); + private final TransactionContextWrapper transactionContextWrapper; + + RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextWrapper, + final TransactionProxy parent, final String shardName) { + this.parent = requireNonNull(parent); this.shardName = shardName; - this.transactionContextAdapter = transactionContextAdapter; - createTxTries = (int) (parent.getActorContext().getDatastoreContext(). - getShardLeaderElectionTimeout().duration().toMillis() / - CREATE_TX_TRY_INTERVAL.toMillis()); + this.transactionContextWrapper = transactionContextWrapper; + + // For the total create tx timeout, use 2 times the election timeout. This should be enough time for + // a leader re-election to occur if we happen to hit it in transition. + totalCreateTxTimeout = parent.getActorUtils().getDatastoreContext().getShardRaftConfig() + .getElectionTimeOutInterval().toMillis() * 2; + + // We'll use the operationTimeout for the the create Tx message timeout so it can be set appropriately + // for unit tests but cap it at MAX_CREATE_TX_MSG_TIMEOUT_IN_MS. The operationTimeout could be set + // larger than the totalCreateTxTimeout in production which we don't want. + long operationTimeout = parent.getActorUtils().getOperationTimeout().duration().toMillis(); + createTxMessageTimeout = new Timeout(Math.min(operationTimeout, MAX_CREATE_TX_MSG_TIMEOUT_IN_MS), + TimeUnit.MILLISECONDS); } String getShardName() { @@ -71,12 +88,8 @@ final class RemoteTransactionContextSupport { return parent.getType(); } - private ActorContext getActorContext() { - return parent.getActorContext(); - } - - private Semaphore getOperationLimiter() { - return parent.getLimiter(); + private ActorUtils getActorUtils() { + return parent.getActorUtils(); } private TransactionIdentifier getIdentifier() { @@ -86,70 +99,106 @@ final class RemoteTransactionContextSupport { /** * Sets the target primary shard and initiates a CreateTransaction try. */ - void setPrimaryShard(ActorSelection primaryShard) { - this.primaryShard = primaryShard; + void setPrimaryShard(final PrimaryShardInfo newPrimaryShardInfo) { + this.primaryShardInfo = newPrimaryShardInfo; + + if (getTransactionType() == TransactionType.WRITE_ONLY + && getActorUtils().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { + ActorSelection primaryShard = newPrimaryShardInfo.getPrimaryShardActor(); - if (getTransactionType() == TransactionType.WRITE_ONLY && - getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", getIdentifier(), primaryShard); // For write-only Tx's we prepare the transaction modifications directly on the shard actor // to avoid the overhead of creating a separate transaction actor. - // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow. - transactionContextAdapter.executePriorTransactionOperations(createValidTransactionContext(this.primaryShard, - this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION)); + transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext( + primaryShard, String.valueOf(primaryShard.path()), newPrimaryShardInfo.getPrimaryShardVersion())); } else { tryCreateTransaction(); } } /** - * Performs a CreateTransaction try async. + Performs a CreateTransaction try async. */ private void tryCreateTransaction() { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard); - } + LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), + primaryShardInfo.getPrimaryShardActor()); - Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(), - getTransactionType().ordinal(), getIdentifier().getChainId()).toSerializable(); + Object serializedCreateMessage = new CreateTransaction(getIdentifier(), getTransactionType().ordinal(), + primaryShardInfo.getPrimaryShardVersion()).toSerializable(); - Future createTxFuture = getActorContext().executeOperationAsync(primaryShard, serializedCreateMessage); + Future createTxFuture = getActorUtils().executeOperationAsync( + primaryShardInfo.getPrimaryShardActor(), serializedCreateMessage, createTxMessageTimeout); createTxFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Object response) { + public void onComplete(final Throwable failure, final Object response) { onCreateTransactionComplete(failure, response); } - }, getActorContext().getClientDispatcher()); + }, getActorUtils().getClientDispatcher()); + } + + private void tryFindPrimaryShard() { + LOG.debug("Tx {} Retrying findPrimaryShardAsync for shard {}", getIdentifier(), shardName); + + this.primaryShardInfo = null; + Future findPrimaryFuture = getActorUtils().findPrimaryShardAsync(shardName); + findPrimaryFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final PrimaryShardInfo newPrimaryShardInfo) { + onFindPrimaryShardComplete(failure, newPrimaryShardInfo); + } + }, getActorUtils().getClientDispatcher()); + } + + @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "https://github.com/spotbugs/spotbugs/issues/811") + private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo newPrimaryShardInfo) { + if (failure == null) { + this.primaryShardInfo = newPrimaryShardInfo; + tryCreateTransaction(); + } else { + LOG.debug("Tx {}: Find primary for shard {} failed", getIdentifier(), shardName, failure); + + onCreateTransactionComplete(failure, null); + } } - private void onCreateTransactionComplete(Throwable failure, Object response) { - if(failure instanceof NoShardLeaderException) { - // There's no leader for the shard yet - schedule and try again, unless we're out - // of retries. Note: createTxTries is volatile as it may be written by different - // threads however not concurrently, therefore decrementing it non-atomically here - // is ok. - if(--createTxTries > 0) { - LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry", - getIdentifier(), shardName); - - getActorContext().getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL, - new Runnable() { - @Override - public void run() { - tryCreateTransaction(); - } - }, getActorContext().getClientDispatcher()); - return; + private void onCreateTransactionComplete(final Throwable failure, final Object response) { + // An AskTimeoutException will occur if the local shard forwards to an unavailable remote leader or + // the cached remote leader actor is no longer available. + boolean retryCreateTransaction = primaryShardInfo != null + && (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException); + + // Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may + // be written by different threads however not concurrently, therefore decrementing it + // non-atomically here is ok. + if (retryCreateTransaction && totalCreateTxTimeout > 0) { + long scheduleInterval = CREATE_TX_TRY_INTERVAL_IN_MS; + if (failure instanceof AskTimeoutException) { + // Since we use the createTxMessageTimeout for the CreateTransaction request and it timed + // out, subtract it from the total timeout. Also since the createTxMessageTimeout period + // has already elapsed, we can immediately schedule the retry (10 ms is virtually immediate). + totalCreateTxTimeout -= createTxMessageTimeout.duration().toMillis(); + scheduleInterval = 10; } + + totalCreateTxTimeout -= scheduleInterval; + + LOG.debug("Tx {}: create tx on shard {} failed with exception \"{}\" - scheduling retry in {} ms", + getIdentifier(), shardName, failure, scheduleInterval); + + getActorUtils().getActorSystem().scheduler().scheduleOnce( + FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS), + this::tryFindPrimaryShard, getActorUtils().getClientDispatcher()); + return; } createTransactionContext(failure, response); } - private void createTransactionContext(Throwable failure, Object response) { + private void createTransactionContext(final Throwable failure, final Object response) { // Create the TransactionContext from the response or failure. Store the new // TransactionContext locally until we've completed invoking the // TransactionOperations. This avoids thread timing issues which could cause @@ -159,47 +208,47 @@ final class RemoteTransactionContextSupport { // TransactionOperations. So to avoid thus timing, we don't publish the // TransactionContext until after we've executed all cached TransactionOperations. TransactionContext localTransactionContext; - if(failure != null) { + if (failure != null) { LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure); - localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), getOperationLimiter()); - } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) { + Throwable resultingEx = failure; + if (failure instanceof AskTimeoutException) { + resultingEx = new ShardLeaderNotRespondingException(String.format( + "Could not create a %s transaction on shard %s. The shard leader isn't responding.", + parent.getType(), shardName), failure); + } else if (!(failure instanceof NoShardLeaderException)) { + resultingEx = new Exception(String.format( + "Error creating %s transaction on shard %s", parent.getType(), shardName), failure); + } + + localTransactionContext = new NoOpTransactionContext(resultingEx, getIdentifier()); + } else if (CreateTransactionReply.isSerializedType(response)) { localTransactionContext = createValidTransactionContext( CreateTransactionReply.fromSerializable(response)); } else { IllegalArgumentException exception = new IllegalArgumentException(String.format( "Invalid reply type %s for CreateTransaction", response.getClass())); - localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter()); + localTransactionContext = new NoOpTransactionContext(exception, getIdentifier()); } - transactionContextAdapter.executePriorTransactionOperations(localTransactionContext); + transactionContextWrapper.executePriorTransactionOperations(localTransactionContext); } - private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { + private TransactionContext createValidTransactionContext(final CreateTransactionReply reply) { LOG.debug("Tx {} Received {}", getIdentifier(), reply); - return createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()), - reply.getTransactionPath(), reply.getVersion()); + return createValidTransactionContext(getActorUtils().actorSelection(reply.getTransactionPath()), + reply.getTransactionPath(), primaryShardInfo.getPrimaryShardVersion()); } - private TransactionContext createValidTransactionContext(ActorSelection transactionActor, String transactionPath, - short remoteTransactionVersion) { - // TxActor is always created where the leader of the shard is. - // Check if TxActor is created in the same node - boolean isTxActorLocal = getActorContext().isPathLocal(transactionPath); - final TransactionContext ret; - - if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { - ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(), - getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getCompleter()); - } else { - ret = new RemoteTransactionContext(transactionActor, getIdentifier(), getActorContext(), - isTxActorLocal, remoteTransactionVersion, parent.getCompleter()); - } + private TransactionContext createValidTransactionContext(final ActorSelection transactionActor, + final String transactionPath, final short remoteTransactionVersion) { + final TransactionContext ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(), + transactionActor, getActorUtils(), remoteTransactionVersion, transactionContextWrapper.getLimiter()); - if(parent.getType() == TransactionType.READ_ONLY) { - TransactionContextCleanup.track(this, ret); + if (parent.getType() == TransactionType.READ_ONLY) { + TransactionContextCleanup.track(parent, ret); } return ret;