X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FRemoteTransactionContextSupport.java;h=1e0d1279e3e749b9728a3d8cdfc1d61ec59facbc;hb=4447f81c26b851e46acd3f111768bb498f0d553f;hp=2924eaab574f2e3bb95b30877d7bef2328a0d6cf;hpb=4d1709660b7af992d4c382a2a38debb5c7d64fb9;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
index 2924eaab57..1e0d1279e3 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
@@ -30,7 +30,7 @@ import scala.concurrent.duration.FiniteDuration;
* Handles creation of TransactionContext instances for remote transactions. This class creates
* remote transactions, if necessary, by sending CreateTransaction messages with retries, up to a limit,
* if the shard doesn't have a leader yet. This is done by scheduling a retry task after a short delay.
- *
+ *
* The end result from a completed CreateTransaction message is a TransactionContext that is
* used to perform transaction operations. Transaction operations that occur before the
* CreateTransaction completes are cache via a TransactionContextWrapper and executed once the
@@ -59,8 +59,8 @@ final class RemoteTransactionContextSupport {
private final TransactionContextWrapper transactionContextWrapper;
- RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextWrapper, final TransactionProxy parent,
- final String shardName) {
+ RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextWrapper,
+ final TransactionProxy parent, final String shardName) {
this.parent = Preconditions.checkNotNull(parent);
this.shardName = shardName;
this.transactionContextWrapper = transactionContextWrapper;
@@ -100,8 +100,8 @@ final class RemoteTransactionContextSupport {
void setPrimaryShard(PrimaryShardInfo primaryShardInfo) {
this.primaryShardInfo = primaryShardInfo;
- if (getTransactionType() == TransactionType.WRITE_ONLY &&
- getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+ if (getTransactionType() == TransactionType.WRITE_ONLY
+ && getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
ActorSelection primaryShard = primaryShardInfo.getPrimaryShardActor();
LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
@@ -120,10 +120,8 @@ final class RemoteTransactionContextSupport {
Performs a CreateTransaction try async.
*/
private void tryCreateTransaction() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(),
- primaryShardInfo.getPrimaryShardActor());
- }
+ LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(),
+ primaryShardInfo.getPrimaryShardActor());
Object serializedCreateMessage = new CreateTransaction(getIdentifier(), getTransactionType().ordinal(),
primaryShardInfo.getPrimaryShardVersion()).toSerializable();
@@ -146,15 +144,15 @@ final class RemoteTransactionContextSupport {
Future findPrimaryFuture = getActorContext().findPrimaryShardAsync(shardName);
findPrimaryFuture.onComplete(new OnComplete() {
@Override
- public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
- onFindPrimaryShardComplete(failure, primaryShardInfo);
+ public void onComplete(final Throwable failure, final PrimaryShardInfo newPrimaryShardInfo) {
+ onFindPrimaryShardComplete(failure, newPrimaryShardInfo);
}
}, getActorContext().getClientDispatcher());
}
- private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
+ private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo newPrimaryShardInfo) {
if (failure == null) {
- this.primaryShardInfo = primaryShardInfo;
+ this.primaryShardInfo = newPrimaryShardInfo;
tryCreateTransaction();
} else {
LOG.debug("Tx {}: Find primary for shard {} failed", getIdentifier(), shardName, failure);
@@ -166,15 +164,15 @@ final class RemoteTransactionContextSupport {
private void onCreateTransactionComplete(Throwable failure, Object response) {
// An AskTimeoutException will occur if the local shard forwards to an unavailable remote leader or
// the cached remote leader actor is no longer available.
- boolean retryCreateTransaction = primaryShardInfo != null &&
- (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException);
- if(retryCreateTransaction) {
+ boolean retryCreateTransaction = primaryShardInfo != null
+ && (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException);
+ if (retryCreateTransaction) {
// Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may
// be written by different threads however not concurrently, therefore decrementing it
// non-atomically here is ok.
- if(totalCreateTxTimeout > 0) {
+ if (totalCreateTxTimeout > 0) {
long scheduleInterval = CREATE_TX_TRY_INTERVAL_IN_MS;
- if(failure instanceof AskTimeoutException) {
+ if (failure instanceof AskTimeoutException) {
// Since we use the createTxMessageTimeout for the CreateTransaction request and it timed
// out, subtract it from the total timeout. Also since the createTxMessageTimeout period
// has already elapsed, we can immediately schedule the retry (10 ms is virtually immediate).
@@ -188,13 +186,8 @@ final class RemoteTransactionContextSupport {
getIdentifier(), shardName, failure, scheduleInterval);
getActorContext().getActorSystem().scheduler().scheduleOnce(
- FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS),
- new Runnable() {
- @Override
- public void run() {
- tryFindPrimaryShard();
- }
- }, getActorContext().getClientDispatcher());
+ FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS),
+ () -> tryFindPrimaryShard(), getActorContext().getClientDispatcher());
return;
}
}
@@ -212,15 +205,15 @@ final class RemoteTransactionContextSupport {
// TransactionOperations. So to avoid thus timing, we don't publish the
// TransactionContext until after we've executed all cached TransactionOperations.
TransactionContext localTransactionContext;
- if(failure != null) {
+ if (failure != null) {
LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
Throwable resultingEx = failure;
- if(failure instanceof AskTimeoutException) {
+ if (failure instanceof AskTimeoutException) {
resultingEx = new ShardLeaderNotRespondingException(String.format(
"Could not create a %s transaction on shard %s. The shard leader isn't responding.",
parent.getType(), shardName), failure);
- } else if(!(failure instanceof NoShardLeaderException)) {
+ } else if (!(failure instanceof NoShardLeaderException)) {
resultingEx = new Exception(String.format(
"Error creating %s transaction on shard %s", parent.getType(), shardName), failure);
}
@@ -251,7 +244,7 @@ final class RemoteTransactionContextSupport {
final TransactionContext ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(),
transactionActor, getActorContext(), remoteTransactionVersion, transactionContextWrapper.getLimiter());
- if(parent.getType() == TransactionType.READ_ONLY) {
+ if (parent.getType() == TransactionType.READ_ONLY) {
TransactionContextCleanup.track(parent, ret);
}