import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
/**
* TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
private final TransactionType transactionType;
private final ActorContext actorContext;
private final TransactionIdentifier identifier;
- private final TransactionChainProxy transactionChainProxy;
+ private final String transactionChainId;
private final SchemaContext schemaContext;
private boolean inReadyState;
public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
- this(actorContext, transactionType, null);
+ this(actorContext, transactionType, "");
}
public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
- TransactionChainProxy transactionChainProxy) {
+ String transactionChainId) {
this.actorContext = Preconditions.checkNotNull(actorContext,
"actorContext should not be null");
this.transactionType = Preconditions.checkNotNull(transactionType,
"transactionType should not be null");
this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
"schemaContext should not be null");
- this.transactionChainProxy = transactionChainProxy;
+ this.transactionChainId = transactionChainId;
String memberName = actorContext.getCurrentMemberName();
if(memberName == null){
}
}
- if(transactionChainProxy != null){
- transactionChainProxy.onTransactionReady(cohortFutures);
- }
+ onTransactionReady(cohortFutures);
return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
identifier.toString());
}
+ /**
+ * Method for derived classes to be notified when the transaction has been readied.
+ *
+ * @param cohortFutures the cohort Futures for each shard transaction.
+ */
+ protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
+ }
+
+ /**
+ * Method called to send a CreateTransaction message to a shard.
+ *
+ * @param shard the shard actor to send to
+ * @param serializedCreateMessage the serialized message to send
+ * @return the response Future
+ */
+ protected Future<Object> sendCreateTransaction(ActorSelection shard,
+ Object serializedCreateMessage) {
+ return actorContext.executeOperationAsync(shard, serializedCreateMessage);
+ }
+
@Override
public Object getIdentifier() {
return this.identifier;
}
public String getTransactionChainId() {
- if(transactionChainProxy == null){
- return "";
- }
- return transactionChainProxy.getTransactionChainId();
+ return transactionChainId;
}
/**
* Performs a CreateTransaction try async.
*/
private void tryCreateTransaction() {
- Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard,
+ Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
new CreateTransaction(identifier.toString(),
TransactionProxy.this.transactionType.ordinal(),
getTransactionChainId()).toSerializable());
// Check if TxActor is created in the same node
boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
- transactionContext = new TransactionContextImpl(transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal);
+ transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
+ actorContext, schemaContext, isTxActorLocal, reply.getVersion());
}
}
private final ActorContext actorContext;
private final SchemaContext schemaContext;
+ private final String transactionPath;
private final ActorSelection actor;
private final boolean isTxActorLocal;
+ private final int remoteTransactionVersion;
- private TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+ private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
ActorContext actorContext, SchemaContext schemaContext,
- boolean isTxActorLocal) {
+ boolean isTxActorLocal, int remoteTransactionVersion) {
super(identifier);
+ this.transactionPath = transactionPath;
this.actor = actor;
this.actorContext = actorContext;
this.schemaContext = schemaContext;
this.isTxActorLocal = isTxActorLocal;
+ this.remoteTransactionVersion = remoteTransactionVersion;
}
private ActorSelection getActor() {
} else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
- return actorContext.actorSelection(reply.getCohortPath());
+ String cohortPath = reply.getCohortPath();
+
+ // In Helium we used to return the local path of the actor which represented
+ // a remote ThreePhaseCommitCohort. The local path would then be converted to
+ // a remote path using this resolvePath method. To maintain compatibility with
+ // a Helium node we need to continue to do this conversion.
+ // At some point in the future when upgrades from Helium are not supported
+ // we could remove this code to resolvePath and just use the cohortPath as the
+ // resolved cohortPath
+ if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
+ cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
+ }
+
+ return actorContext.actorSelection(cohortPath);
} else {
// Throwing an exception here will fail the Future.