X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=d93bae22e08d9fddb3f1ac7d9c12aa4a278d0ff6;hb=7d9021752d0def517a4bd129743474cc0993e4e2;hp=ffb1ab7c55064ecf7683e1857800b256c9c82a1d;hpb=6a4c3c11f68c52d00d2bc7f0b30b086113ebe859;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index ffb1ab7c55..d93bae22e0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -21,6 +21,14 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; @@ -50,15 +58,6 @@ import scala.concurrent.Future; import scala.concurrent.Promise; import scala.concurrent.duration.FiniteDuration; -import javax.annotation.concurrent.GuardedBy; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - /** * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard *

@@ -182,23 +181,23 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final TransactionType transactionType; private final ActorContext actorContext; private final TransactionIdentifier identifier; - private final TransactionChainProxy transactionChainProxy; + private final String transactionChainId; private final SchemaContext schemaContext; private boolean inReadyState; public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { - this(actorContext, transactionType, null); + this(actorContext, transactionType, ""); } public TransactionProxy(ActorContext actorContext, TransactionType transactionType, - TransactionChainProxy transactionChainProxy) { + String transactionChainId) { this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null"); this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(), "schemaContext should not be null"); - this.transactionChainProxy = transactionChainProxy; + this.transactionChainId = transactionChainId; String memberName = actorContext.getCurrentMemberName(); if(memberName == null){ @@ -237,6 +236,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return recordedOperationFutures; } + @VisibleForTesting + boolean hasTransactionContext() { + for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { + TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + if(transactionContext != null) { + return true; + } + } + + return false; + } + @Override public CheckedFuture>, ReadFailedException> read( final YangInstanceIdentifier path) { @@ -433,14 +444,32 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } - if(transactionChainProxy != null){ - transactionChainProxy.onTransactionReady(cohortFutures); - } + onTransactionReady(cohortFutures); return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, identifier.toString()); } + /** + * Method for derived classes to be notified when the transaction has been readied. + * + * @param cohortFutures the cohort Futures for each shard transaction. + */ + protected void onTransactionReady(List> cohortFutures) { + } + + /** + * Method called to send a CreateTransaction message to a shard. + * + * @param shard the shard actor to send to + * @param serializedCreateMessage the serialized message to send + * @return the response Future + */ + protected Future sendCreateTransaction(ActorSelection shard, + Object serializedCreateMessage) { + return actorContext.executeOperationAsync(shard, serializedCreateMessage); + } + @Override public Object getIdentifier() { return this.identifier; @@ -502,10 +531,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } public String getTransactionChainId() { - if(transactionChainProxy == null){ - return ""; - } - return transactionChainProxy.getTransactionChainId(); + return transactionChainId; } /** @@ -591,7 +617,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Performs a CreateTransaction try async. */ private void tryCreateTransaction() { - Future createTxFuture = actorContext.executeOperationAsync(primaryShard, + Future createTxFuture = sendCreateTransaction(primaryShard, new CreateTransaction(identifier.toString(), TransactionProxy.this.transactionType.ordinal(), getTransactionChainId()).toSerializable()); @@ -626,29 +652,42 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // respect to #addTxOperationOnComplete to handle timing issues and ensure no // TransactionOperation is missed and that they are processed in the order they occurred. synchronized(txOperationsOnComplete) { + // Store the new TransactionContext locally until we've completed invoking the + // TransactionOperations. This avoids thread timing issues which could cause + // out-of-order TransactionOperations. Eg, on a modification operation, if the + // TransactionContext is non-null, then we directly call the TransactionContext. + // However, at the same time, the code may be executing the cached + // TransactionOperations. So to avoid thus timing, we don't publish the + // TransactionContext until after we've executed all cached TransactionOperations. + TransactionContext localTransactionContext; if(failure != null) { LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier, failure.getMessage()); - transactionContext = new NoOpTransactionContext(failure, identifier); + localTransactionContext = new NoOpTransactionContext(failure, identifier); } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { - createValidTransactionContext(CreateTransactionReply.fromSerializable(response)); + localTransactionContext = createValidTransactionContext( + CreateTransactionReply.fromSerializable(response)); } else { IllegalArgumentException exception = new IllegalArgumentException(String.format( "Invalid reply type %s for CreateTransaction", response.getClass())); - transactionContext = new NoOpTransactionContext(exception, identifier); + localTransactionContext = new NoOpTransactionContext(exception, identifier); } for(TransactionOperation oper: txOperationsOnComplete) { - oper.invoke(transactionContext); + oper.invoke(localTransactionContext); } txOperationsOnComplete.clear(); + + // We're done invoking the TransactionOperations so we can now publish the + // TransactionContext. + transactionContext = localTransactionContext; } } - private void createValidTransactionContext(CreateTransactionReply reply) { + private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { String transactionPath = reply.getTransactionPath(); LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath); @@ -669,7 +708,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Check if TxActor is created in the same node boolean isTxActorLocal = actorContext.isLocalPath(transactionPath); - transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier, + return new TransactionContextImpl(transactionPath, transactionActor, identifier, actorContext, schemaContext, isTxActorLocal, reply.getVersion()); } }