X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FRemoteTransactionContextSupport.java;h=b32ef452756ddb689803fda8b4b60c8135cad793;hb=99f80f27bee37bb23e345420bf14bb7bb4793c28;hp=8f5aade018aa936eb9b3a42fcc4e9a4539961afb;hpb=3e80db38f7f579505173c29c42f800983d7ca6c1;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
index 8f5aade018..b32ef45275 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
@@ -8,19 +8,20 @@
*/
package org.opendaylight.controller.cluster.datastore;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
-import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
@@ -30,10 +31,10 @@ import scala.concurrent.duration.FiniteDuration;
* Handles creation of TransactionContext instances for remote transactions. This class creates
* remote transactions, if necessary, by sending CreateTransaction messages with retries, up to a limit,
* if the shard doesn't have a leader yet. This is done by scheduling a retry task after a short delay.
- *
+ *
* The end result from a completed CreateTransaction message is a TransactionContext that is
* used to perform transaction operations. Transaction operations that occur before the
- * CreateTransaction completes are cache via a TransactionContextWrapper and executed once the
+ * CreateTransaction completes are cached via a DelayedTransactionContextWrapper and executed once the
* CreateTransaction completes, successfully or not.
*/
final class RemoteTransactionContextSupport {
@@ -57,23 +58,23 @@ final class RemoteTransactionContextSupport {
private final Timeout createTxMessageTimeout;
- private final TransactionContextWrapper transactionContextWrapper;
+ private final DelayedTransactionContextWrapper transactionContextWrapper;
- RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextWrapper, final TransactionProxy parent,
- final String shardName) {
- this.parent = Preconditions.checkNotNull(parent);
+ RemoteTransactionContextSupport(final DelayedTransactionContextWrapper transactionContextWrapper,
+ final TransactionProxy parent, final String shardName) {
+ this.parent = requireNonNull(parent);
this.shardName = shardName;
this.transactionContextWrapper = transactionContextWrapper;
// For the total create tx timeout, use 2 times the election timeout. This should be enough time for
// a leader re-election to occur if we happen to hit it in transition.
- totalCreateTxTimeout = parent.getActorContext().getDatastoreContext().getShardRaftConfig()
+ totalCreateTxTimeout = parent.getActorUtils().getDatastoreContext().getShardRaftConfig()
.getElectionTimeOutInterval().toMillis() * 2;
// We'll use the operationTimeout for the the create Tx message timeout so it can be set appropriately
// for unit tests but cap it at MAX_CREATE_TX_MSG_TIMEOUT_IN_MS. The operationTimeout could be set
// larger than the totalCreateTxTimeout in production which we don't want.
- long operationTimeout = parent.getActorContext().getOperationTimeout().duration().toMillis();
+ long operationTimeout = parent.getActorUtils().getOperationTimeout().duration().toMillis();
createTxMessageTimeout = new Timeout(Math.min(operationTimeout, MAX_CREATE_TX_MSG_TIMEOUT_IN_MS),
TimeUnit.MILLISECONDS);
}
@@ -86,8 +87,8 @@ final class RemoteTransactionContextSupport {
return parent.getType();
}
- private ActorContext getActorContext() {
- return parent.getActorContext();
+ private ActorUtils getActorUtils() {
+ return parent.getActorUtils();
}
private TransactionIdentifier getIdentifier() {
@@ -97,12 +98,12 @@ final class RemoteTransactionContextSupport {
/**
* Sets the target primary shard and initiates a CreateTransaction try.
*/
- void setPrimaryShard(PrimaryShardInfo primaryShardInfo) {
- this.primaryShardInfo = primaryShardInfo;
+ void setPrimaryShard(final PrimaryShardInfo newPrimaryShardInfo) {
+ primaryShardInfo = newPrimaryShardInfo;
- if (getTransactionType() == TransactionType.WRITE_ONLY &&
- getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
- ActorSelection primaryShard = primaryShardInfo.getPrimaryShardActor();
+ if (getTransactionType() == TransactionType.WRITE_ONLY
+ && getActorUtils().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+ ActorSelection primaryShard = newPrimaryShardInfo.getPrimaryShardActor();
LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
getIdentifier(), primaryShard);
@@ -110,52 +111,49 @@ final class RemoteTransactionContextSupport {
// For write-only Tx's we prepare the transaction modifications directly on the shard actor
// to avoid the overhead of creating a separate transaction actor.
transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext(
- primaryShard, primaryShard.path().toString(), primaryShardInfo.getPrimaryShardVersion()));
+ primaryShard, String.valueOf(primaryShard.path()), newPrimaryShardInfo.getPrimaryShardVersion()));
} else {
tryCreateTransaction();
}
}
/**
- * Performs a CreateTransaction try async.
+ Performs a CreateTransaction try async.
*/
private void tryCreateTransaction() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(),
- primaryShardInfo.getPrimaryShardActor());
- }
+ LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(),
+ primaryShardInfo.getPrimaryShardActor());
- Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
- getTransactionType().ordinal(), getIdentifier().getChainId(),
+ Object serializedCreateMessage = new CreateTransaction(getIdentifier(), getTransactionType().ordinal(),
primaryShardInfo.getPrimaryShardVersion()).toSerializable();
- Future