X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FRemoteTransactionContextSupport.java;h=88c797a99074c366b63a703cc583f64c1608d310;hp=59205692d119ffc9183a7c505f39c43008ea28db;hb=5c5c980e564d2b5f6cd26821ffd26997f59af260;hpb=fed267bf1b8a9ea81d1ee7c9721962863b98e391 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java index 59205692d1..88c797a990 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java @@ -10,13 +10,17 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.OnComplete; +import akka.pattern.AskTimeoutException; +import akka.util.Timeout; import com.google.common.base.Preconditions; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,10 +40,8 @@ import scala.concurrent.duration.FiniteDuration; final class RemoteTransactionContextSupport { private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContextSupport.class); - /** - * Time interval in between transaction create retries. - */ - private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS); + private static final long CREATE_TX_TRY_INTERVAL_IN_MS = 1000; + private static final long MAX_CREATE_TX_MSG_TIMEOUT_IN_MS = 5000; private final TransactionProxy parent; private final String shardName; @@ -48,7 +50,13 @@ final class RemoteTransactionContextSupport { * The target primary shard. */ private volatile ActorSelection primaryShard; - private volatile int createTxTries; + + /** + * The total timeout for creating a tx on the primary shard. + */ + private volatile long totalCreateTxTimeout; + + private final Timeout createTxMessageTimeout; private final TransactionContextWrapper transactionContextWrapper; @@ -57,9 +65,18 @@ final class RemoteTransactionContextSupport { this.parent = Preconditions.checkNotNull(parent); this.shardName = shardName; this.transactionContextWrapper = transactionContextWrapper; - createTxTries = (int) (parent.getActorContext().getDatastoreContext(). - getShardLeaderElectionTimeout().duration().toMillis() / - CREATE_TX_TRY_INTERVAL.toMillis()); + + // For the total create tx timeout, use 2 times the election timeout. This should be enough time for + // a leader re-election to occur if we happen to hit it in transition. + totalCreateTxTimeout = parent.getActorContext().getDatastoreContext().getShardRaftConfig() + .getElectionTimeOutInterval().toMillis() * 2; + + // We'll use the operationTimeout for the the create Tx message timeout so it can be set appropriately + // for unit tests but cap it at MAX_CREATE_TX_MSG_TIMEOUT_IN_MS. The operationTimeout could be set + // larger than the totalCreateTxTimeout in production which we don't want. + long operationTimeout = parent.getActorContext().getOperationTimeout().duration().toMillis(); + createTxMessageTimeout = new Timeout(Math.min(operationTimeout, MAX_CREATE_TX_MSG_TIMEOUT_IN_MS), + TimeUnit.MILLISECONDS); } String getShardName() { @@ -74,10 +91,6 @@ final class RemoteTransactionContextSupport { return parent.getActorContext(); } - private OperationLimiter getOperationLimiter() { - return transactionContextWrapper.getLimiter(); - } - private TransactionIdentifier getIdentifier() { return parent.getIdentifier(); } @@ -113,7 +126,8 @@ final class RemoteTransactionContextSupport { Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(), getTransactionType().ordinal(), getIdentifier().getChainId()).toSerializable(); - Future createTxFuture = getActorContext().executeOperationAsync(primaryShard, serializedCreateMessage); + Future createTxFuture = getActorContext().executeOperationAsync(primaryShard, + serializedCreateMessage, createTxMessageTimeout); createTxFuture.onComplete(new OnComplete() { @Override @@ -123,21 +137,60 @@ final class RemoteTransactionContextSupport { }, getActorContext().getClientDispatcher()); } + private void tryFindPrimaryShard() { + LOG.debug("Tx {} Retrying findPrimaryShardAsync for shard {}", getIdentifier(), shardName); + + this.primaryShard = null; + Future findPrimaryFuture = getActorContext().findPrimaryShardAsync(shardName); + findPrimaryFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) { + onFindPrimaryShardComplete(failure, primaryShardInfo); + } + }, getActorContext().getClientDispatcher()); + } + + private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) { + if (failure == null) { + this.primaryShard = primaryShardInfo.getPrimaryShardActor(); + tryCreateTransaction(); + } else { + LOG.debug("Tx {}: Find primary for shard {} failed", getIdentifier(), shardName, failure); + + onCreateTransactionComplete(failure, null); + } + } + private void onCreateTransactionComplete(Throwable failure, Object response) { - if(failure instanceof NoShardLeaderException) { - // There's no leader for the shard yet - schedule and try again, unless we're out - // of retries. Note: createTxTries is volatile as it may be written by different - // threads however not concurrently, therefore decrementing it non-atomically here - // is ok. - if(--createTxTries > 0) { - LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry", - getIdentifier(), shardName); - - getActorContext().getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL, + // An AskTimeoutException will occur if the local shard forwards to an unavailable remote leader or + // the cached remote leader actor is no longer available. + boolean retryCreateTransaction = this.primaryShard != null && + (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException); + if(retryCreateTransaction) { + // Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may + // be written by different threads however not concurrently, therefore decrementing it + // non-atomically here is ok. + if(totalCreateTxTimeout > 0) { + long scheduleInterval = CREATE_TX_TRY_INTERVAL_IN_MS; + if(failure instanceof AskTimeoutException) { + // Since we use the createTxMessageTimeout for the CreateTransaction request and it timed + // out, subtract it from the total timeout. Also since the createTxMessageTimeout period + // has already elapsed, we can immediately schedule the retry (10 ms is virtually immediate). + totalCreateTxTimeout -= createTxMessageTimeout.duration().toMillis(); + scheduleInterval = 10; + } + + totalCreateTxTimeout -= scheduleInterval; + + LOG.debug("Tx {}: create tx on shard {} failed with exception \"{}\" - scheduling retry in {} ms", + getIdentifier(), shardName, failure, scheduleInterval); + + getActorContext().getActorSystem().scheduler().scheduleOnce( + FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS), new Runnable() { @Override public void run() { - tryCreateTransaction(); + tryFindPrimaryShard(); } }, getActorContext().getClientDispatcher()); return; @@ -160,7 +213,17 @@ final class RemoteTransactionContextSupport { if(failure != null) { LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure); - localTransactionContext = new NoOpTransactionContext(failure, getIdentifier()); + Throwable resultingEx = failure; + if(failure instanceof AskTimeoutException) { + resultingEx = new ShardLeaderNotRespondingException(String.format( + "Could not create a %s transaction on shard %s. The shard leader isn't responding.", + parent.getType(), shardName), failure); + } else if(!(failure instanceof NoShardLeaderException)) { + resultingEx = new Exception(String.format( + "Error creating %s transaction on shard %s", parent.getType(), shardName), failure); + } + + localTransactionContext = new NoOpTransactionContext(resultingEx, getIdentifier()); } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) { localTransactionContext = createValidTransactionContext( CreateTransactionReply.fromSerializable(response));