X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FRemoteTransactionContextSupport.java;h=c1162c2d9385f0b40e644adbf09cfba1a83c07a2;hb=9b235df8e0b4d8c4c7419419538188cdf7b2bfc2;hp=5bafef8a854af9bfed716bdc4a220b8c53998fe5;hpb=5e7cf2452ef634dc934a3ea5a2dd95059fbab68c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java index 5bafef8a85..c1162c2d93 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java @@ -97,12 +97,12 @@ final class RemoteTransactionContextSupport { /** * Sets the target primary shard and initiates a CreateTransaction try. */ - void setPrimaryShard(PrimaryShardInfo primaryShardInfo) { - this.primaryShardInfo = primaryShardInfo; + void setPrimaryShard(final PrimaryShardInfo newPrimaryShardInfo) { + this.primaryShardInfo = newPrimaryShardInfo; if (getTransactionType() == TransactionType.WRITE_ONLY && getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { - ActorSelection primaryShard = primaryShardInfo.getPrimaryShardActor(); + ActorSelection primaryShard = newPrimaryShardInfo.getPrimaryShardActor(); LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", getIdentifier(), primaryShard); @@ -110,7 +110,7 @@ final class RemoteTransactionContextSupport { // For write-only Tx's we prepare the transaction modifications directly on the shard actor // to avoid the overhead of creating a separate transaction actor. transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext( - primaryShard, String.valueOf(primaryShard.path()), primaryShardInfo.getPrimaryShardVersion())); + primaryShard, String.valueOf(primaryShard.path()), newPrimaryShardInfo.getPrimaryShardVersion())); } else { tryCreateTransaction(); } @@ -131,7 +131,7 @@ final class RemoteTransactionContextSupport { createTxFuture.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Object response) { + public void onComplete(final Throwable failure, final Object response) { onCreateTransactionComplete(failure, response); } }, getActorContext().getClientDispatcher()); @@ -161,41 +161,40 @@ final class RemoteTransactionContextSupport { } } - private void onCreateTransactionComplete(Throwable failure, Object response) { + private void onCreateTransactionComplete(final Throwable failure, final Object response) { // An AskTimeoutException will occur if the local shard forwards to an unavailable remote leader or // the cached remote leader actor is no longer available. boolean retryCreateTransaction = primaryShardInfo != null && (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException); - if (retryCreateTransaction) { - // Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may - // be written by different threads however not concurrently, therefore decrementing it - // non-atomically here is ok. - if (totalCreateTxTimeout > 0) { - long scheduleInterval = CREATE_TX_TRY_INTERVAL_IN_MS; - if (failure instanceof AskTimeoutException) { - // Since we use the createTxMessageTimeout for the CreateTransaction request and it timed - // out, subtract it from the total timeout. Also since the createTxMessageTimeout period - // has already elapsed, we can immediately schedule the retry (10 ms is virtually immediate). - totalCreateTxTimeout -= createTxMessageTimeout.duration().toMillis(); - scheduleInterval = 10; - } - - totalCreateTxTimeout -= scheduleInterval; - - LOG.debug("Tx {}: create tx on shard {} failed with exception \"{}\" - scheduling retry in {} ms", - getIdentifier(), shardName, failure, scheduleInterval); - - getActorContext().getActorSystem().scheduler().scheduleOnce( - FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS), - this::tryFindPrimaryShard, getActorContext().getClientDispatcher()); - return; + + // Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may + // be written by different threads however not concurrently, therefore decrementing it + // non-atomically here is ok. + if (retryCreateTransaction && totalCreateTxTimeout > 0) { + long scheduleInterval = CREATE_TX_TRY_INTERVAL_IN_MS; + if (failure instanceof AskTimeoutException) { + // Since we use the createTxMessageTimeout for the CreateTransaction request and it timed + // out, subtract it from the total timeout. Also since the createTxMessageTimeout period + // has already elapsed, we can immediately schedule the retry (10 ms is virtually immediate). + totalCreateTxTimeout -= createTxMessageTimeout.duration().toMillis(); + scheduleInterval = 10; } + + totalCreateTxTimeout -= scheduleInterval; + + LOG.debug("Tx {}: create tx on shard {} failed with exception \"{}\" - scheduling retry in {} ms", + getIdentifier(), shardName, failure, scheduleInterval); + + getActorContext().getActorSystem().scheduler().scheduleOnce( + FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS), + this::tryFindPrimaryShard, getActorContext().getClientDispatcher()); + return; } createTransactionContext(failure, response); } - private void createTransactionContext(Throwable failure, Object response) { + private void createTransactionContext(final Throwable failure, final Object response) { // Create the TransactionContext from the response or failure. Store the new // TransactionContext locally until we've completed invoking the // TransactionOperations. This avoids thread timing issues which could cause @@ -232,15 +231,15 @@ final class RemoteTransactionContextSupport { transactionContextWrapper.executePriorTransactionOperations(localTransactionContext); } - private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { + private TransactionContext createValidTransactionContext(final CreateTransactionReply reply) { LOG.debug("Tx {} Received {}", getIdentifier(), reply); return createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()), reply.getTransactionPath(), primaryShardInfo.getPrimaryShardVersion()); } - private TransactionContext createValidTransactionContext(ActorSelection transactionActor, String transactionPath, - short remoteTransactionVersion) { + private TransactionContext createValidTransactionContext(final ActorSelection transactionActor, + final String transactionPath, final short remoteTransactionVersion) { final TransactionContext ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(), transactionActor, getActorContext(), remoteTransactionVersion, transactionContextWrapper.getLimiter());