import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
-import javax.annotation.concurrent.GuardedBy;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
* <p>
private final TransactionType transactionType;
private final ActorContext actorContext;
private final TransactionIdentifier identifier;
- private final TransactionChainProxy transactionChainProxy;
+ private final String transactionChainId;
private final SchemaContext schemaContext;
private boolean inReadyState;
public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
- this(actorContext, transactionType, null);
+ this(actorContext, transactionType, "");
}
public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
- TransactionChainProxy transactionChainProxy) {
+ String transactionChainId) {
this.actorContext = Preconditions.checkNotNull(actorContext,
"actorContext should not be null");
this.transactionType = Preconditions.checkNotNull(transactionType,
"transactionType should not be null");
this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
"schemaContext should not be null");
- this.transactionChainProxy = transactionChainProxy;
+ this.transactionChainId = transactionChainId;
String memberName = actorContext.getCurrentMemberName();
if(memberName == null){
phantomReferenceCache.put(cleanup, cleanup);
}
- LOG.debug("Created txn {} of type {}", identifier, transactionType);
+ LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
}
@VisibleForTesting
return recordedOperationFutures;
}
+ @VisibleForTesting
+ boolean hasTransactionContext() {
+ for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
final YangInstanceIdentifier path) {
for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
- LOG.debug("Tx {} Readying transaction for shard {}", identifier,
- txFutureCallback.getShardName());
+ LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier,
+ txFutureCallback.getShardName(), transactionChainId);
TransactionContext transactionContext = txFutureCallback.getTransactionContext();
if(transactionContext != null) {
}
}
- if(transactionChainProxy != null){
- transactionChainProxy.onTransactionReady(cohortFutures);
- }
+ onTransactionReady(cohortFutures);
return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
identifier.toString());
}
+ /**
+ * Method for derived classes to be notified when the transaction has been readied.
+ *
+ * @param cohortFutures the cohort Futures for each shard transaction.
+ */
+ protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
+ }
+
+ /**
+ * Method called to send a CreateTransaction message to a shard.
+ *
+ * @param shard the shard actor to send to
+ * @param serializedCreateMessage the serialized message to send
+ * @return the response Future
+ */
+ protected Future<Object> sendCreateTransaction(ActorSelection shard,
+ Object serializedCreateMessage) {
+ return actorContext.executeOperationAsync(shard, serializedCreateMessage);
+ }
+
@Override
public Object getIdentifier() {
return this.identifier;
}
public String getTransactionChainId() {
- if(transactionChainProxy == null){
- return "";
- }
- return transactionChainProxy.getTransactionChainId();
+ return transactionChainId;
}
/**
* Performs a CreateTransaction try async.
*/
private void tryCreateTransaction() {
- Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard,
+ Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
new CreateTransaction(identifier.toString(),
TransactionProxy.this.transactionType.ordinal(),
getTransactionChainId()).toSerializable());
// respect to #addTxOperationOnComplete to handle timing issues and ensure no
// TransactionOperation is missed and that they are processed in the order they occurred.
synchronized(txOperationsOnComplete) {
+ // Store the new TransactionContext locally until we've completed invoking the
+ // TransactionOperations. This avoids thread timing issues which could cause
+ // out-of-order TransactionOperations. Eg, on a modification operation, if the
+ // TransactionContext is non-null, then we directly call the TransactionContext.
+ // However, at the same time, the code may be executing the cached
+ // TransactionOperations. So to avoid thus timing, we don't publish the
+ // TransactionContext until after we've executed all cached TransactionOperations.
+ TransactionContext localTransactionContext;
if(failure != null) {
LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
failure.getMessage());
- transactionContext = new NoOpTransactionContext(failure, identifier);
+ localTransactionContext = new NoOpTransactionContext(failure, identifier);
} else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
- createValidTransactionContext(CreateTransactionReply.fromSerializable(response));
+ localTransactionContext = createValidTransactionContext(
+ CreateTransactionReply.fromSerializable(response));
} else {
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Invalid reply type %s for CreateTransaction", response.getClass()));
- transactionContext = new NoOpTransactionContext(exception, identifier);
+ localTransactionContext = new NoOpTransactionContext(exception, identifier);
}
for(TransactionOperation oper: txOperationsOnComplete) {
- oper.invoke(transactionContext);
+ oper.invoke(localTransactionContext);
}
txOperationsOnComplete.clear();
+
+ // We're done invoking the TransactionOperations so we can now publish the
+ // TransactionContext.
+ transactionContext = localTransactionContext;
}
}
- private void createValidTransactionContext(CreateTransactionReply reply) {
+ private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
String transactionPath = reply.getTransactionPath();
LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
// Check if TxActor is created in the same node
boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
- transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
+ return new TransactionContextImpl(transactionPath, transactionActor, identifier,
actorContext, schemaContext, isTxActorLocal, reply.getVersion());
}
}