X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FRemoteTransactionContextSupport.java;h=2924eaab574f2e3bb95b30877d7bef2328a0d6cf;hb=a47dd7a5d21ca68804a6d0e2e3ca765f223c2ef4;hp=4ce0767d1df7906e2b6bbeb82e787b2ac6e17772;hpb=28b2fd303b8e8bc757de6ead454ae06469113b34;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java index 4ce0767d1d..2924eaab57 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java @@ -14,9 +14,9 @@ import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.google.common.base.Preconditions; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; @@ -48,7 +48,7 @@ final class RemoteTransactionContextSupport { /** * The target primary shard. */ - private volatile ActorSelection primaryShard; + private volatile PrimaryShardInfo primaryShardInfo; /** * The total timeout for creating a tx on the primary shard. @@ -97,36 +97,39 @@ final class RemoteTransactionContextSupport { /** * Sets the target primary shard and initiates a CreateTransaction try. */ - void setPrimaryShard(ActorSelection primaryShard, short primaryVersion) { - this.primaryShard = primaryShard; + void setPrimaryShard(PrimaryShardInfo primaryShardInfo) { + this.primaryShardInfo = primaryShardInfo; if (getTransactionType() == TransactionType.WRITE_ONLY && getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { + ActorSelection primaryShard = primaryShardInfo.getPrimaryShardActor(); + LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", getIdentifier(), primaryShard); // For write-only Tx's we prepare the transaction modifications directly on the shard actor // to avoid the overhead of creating a separate transaction actor. - transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext(this.primaryShard, - this.primaryShard.path().toString(), primaryVersion)); + transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext( + primaryShard, String.valueOf(primaryShard.path()), primaryShardInfo.getPrimaryShardVersion())); } else { tryCreateTransaction(); } } /** - * Performs a CreateTransaction try async. + Performs a CreateTransaction try async. */ private void tryCreateTransaction() { if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard); + LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), + primaryShardInfo.getPrimaryShardActor()); } - Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(), - getTransactionType().ordinal(), getIdentifier().getChainId()).toSerializable(); + Object serializedCreateMessage = new CreateTransaction(getIdentifier(), getTransactionType().ordinal(), + primaryShardInfo.getPrimaryShardVersion()).toSerializable(); - Future createTxFuture = getActorContext().executeOperationAsync(primaryShard, - serializedCreateMessage, createTxMessageTimeout); + Future createTxFuture = getActorContext().executeOperationAsync( + primaryShardInfo.getPrimaryShardActor(), serializedCreateMessage, createTxMessageTimeout); createTxFuture.onComplete(new OnComplete() { @Override @@ -139,7 +142,7 @@ final class RemoteTransactionContextSupport { private void tryFindPrimaryShard() { LOG.debug("Tx {} Retrying findPrimaryShardAsync for shard {}", getIdentifier(), shardName); - this.primaryShard = null; + this.primaryShardInfo = null; Future findPrimaryFuture = getActorContext().findPrimaryShardAsync(shardName); findPrimaryFuture.onComplete(new OnComplete() { @Override @@ -151,7 +154,7 @@ final class RemoteTransactionContextSupport { private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) { if (failure == null) { - this.primaryShard = primaryShardInfo.getPrimaryShardActor(); + this.primaryShardInfo = primaryShardInfo; tryCreateTransaction(); } else { LOG.debug("Tx {}: Find primary for shard {} failed", getIdentifier(), shardName, failure); @@ -163,7 +166,7 @@ final class RemoteTransactionContextSupport { private void onCreateTransactionComplete(Throwable failure, Object response) { // An AskTimeoutException will occur if the local shard forwards to an unavailable remote leader or // the cached remote leader actor is no longer available. - boolean retryCreateTransaction = this.primaryShard != null && + boolean retryCreateTransaction = primaryShardInfo != null && (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException); if(retryCreateTransaction) { // Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may @@ -223,7 +226,7 @@ final class RemoteTransactionContextSupport { } localTransactionContext = new NoOpTransactionContext(resultingEx, getIdentifier()); - } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) { + } else if (CreateTransactionReply.isSerializedType(response)) { localTransactionContext = createValidTransactionContext( CreateTransactionReply.fromSerializable(response)); } else { @@ -240,20 +243,16 @@ final class RemoteTransactionContextSupport { LOG.debug("Tx {} Received {}", getIdentifier(), reply); return createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()), - reply.getTransactionPath(), reply.getVersion()); + reply.getTransactionPath(), primaryShardInfo.getPrimaryShardVersion()); } private TransactionContext createValidTransactionContext(ActorSelection transactionActor, String transactionPath, short remoteTransactionVersion) { - // TxActor is always created where the leader of the shard is. - // Check if TxActor is created in the same node - boolean isTxActorLocal = getActorContext().isPathLocal(transactionPath); final TransactionContext ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(), - transactionActor, getActorContext(), isTxActorLocal, remoteTransactionVersion, - transactionContextWrapper.getLimiter()); + transactionActor, getActorContext(), remoteTransactionVersion, transactionContextWrapper.getLimiter()); if(parent.getType() == TransactionType.READ_ONLY) { - TransactionContextCleanup.track(this, ret); + TransactionContextCleanup.track(parent, ret); } return ret;