import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
/**
* TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
// Check if TxActor is created in the same node
boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
- transactionContext = new TransactionContextImpl(transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal);
+ transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
+ actorContext, schemaContext, isTxActorLocal, reply.getVersion());
}
}
private final ActorContext actorContext;
private final SchemaContext schemaContext;
+ private final String transactionPath;
private final ActorSelection actor;
private final boolean isTxActorLocal;
+ private final int remoteTransactionVersion;
- private TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+ private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
ActorContext actorContext, SchemaContext schemaContext,
- boolean isTxActorLocal) {
+ boolean isTxActorLocal, int remoteTransactionVersion) {
super(identifier);
+ this.transactionPath = transactionPath;
this.actor = actor;
this.actorContext = actorContext;
this.schemaContext = schemaContext;
this.isTxActorLocal = isTxActorLocal;
+ this.remoteTransactionVersion = remoteTransactionVersion;
}
private ActorSelection getActor() {
} else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
- return actorContext.actorSelection(reply.getCohortPath());
+ String cohortPath = reply.getCohortPath();
+
+ // In Helium we used to return the local path of the actor which represented
+ // a remote ThreePhaseCommitCohort. The local path would then be converted to
+ // a remote path using this resolvePath method. To maintain compatibility with
+ // a Helium node we need to continue to do this conversion.
+ // At some point in the future when upgrades from Helium are not supported
+ // we could remove this code to resolvePath and just use the cohortPath as the
+ // resolved cohortPath
+ if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
+ cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
+ }
+
+ return actorContext.actorSelection(cohortPath);
} else {
// Throwing an exception here will fail the Future.