import static java.util.Objects.requireNonNull;
import akka.actor.ActorSelection;
-import akka.dispatch.OnComplete;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
return null;
}
- private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
- String shardName, TransactionContextWrapper transactionContextWrapper) {
+ private AbstractTransactionContextWrapper maybeCreateDirectTransactionContextWrapper(
+ final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
+ final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
+ LOG.debug("Tx {}: Found primary {} for shard {}, trying to use DirectTransactionContextWrapper",
+ parent.getIdentifier(), primaryShardInfo.getPrimaryShardActor(), shardName);
+
+ updateShardInfo(shardName, primaryShardInfo);
+
+ final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
+ try {
+ if (localContext != null) {
+ LOG.debug("Tx {}: Local transaction context created successfully, using DirectTransactionWrapper",
+ parent.getIdentifier());
+ return new DirectTransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName,
+ localContext);
+ } else {
+ LOG.debug("Tx {}: Local transaction context creation failed, using DelayedTransactionWrapper",
+ parent.getIdentifier());
+ final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
+ transactionContextWrapper, parent, shardName);
+ remote.setPrimaryShard(primaryShardInfo);
+ return transactionContextWrapper;
+ }
+ } finally {
+ onTransactionContextCreated(parent.getIdentifier());
+ }
+ }
+
+ private void onFindPrimaryShardSuccess(final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
+ final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
primaryShardInfo.getPrimaryShardActor(), shardName);
updateShardInfo(shardName, primaryShardInfo);
+ final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
try {
- TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
if (localContext != null) {
transactionContextWrapper.executePriorTransactionOperations(localContext);
} else {
- RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
- parent, shardName);
+ final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
+ transactionContextWrapper, parent, shardName);
remote.setPrimaryShard(primaryShardInfo);
}
} finally {
}
}
- private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
- String shardName, TransactionContextWrapper transactionContextWrapper) {
+ private void onFindPrimaryShardFailure(final Throwable failure, final TransactionProxy parent,
+ final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
try {
- transactionContextWrapper.executePriorTransactionOperations(new NoOpTransactionContext(failure,
- parent.getIdentifier()));
+ transactionContextWrapper.executePriorTransactionOperations(
+ new NoOpTransactionContext(failure, parent.getIdentifier()));
} finally {
onTransactionContextCreated(parent.getIdentifier());
}
}
- final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
+ final AbstractTransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
final String shardName) {
- final TransactionContextWrapper transactionContextWrapper =
- new TransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName);
-
- Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
+ final DelayedTransactionContextWrapper contextWrapper = new DelayedTransactionContextWrapper(
+ parent.getIdentifier(), actorUtils, shardName);
+ final Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
if (findPrimaryFuture.isCompleted()) {
- Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
+ final Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
if (maybe.isSuccess()) {
- onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
+ return maybeCreateDirectTransactionContextWrapper(maybe.get(), parent, shardName,
+ contextWrapper);
} else {
- onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
+ onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName,
+ contextWrapper);
}
} else {
- findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
- @Override
- public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
- if (failure == null) {
- onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper);
- } else {
- onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper);
- }
+ findPrimaryFuture.onComplete((result) -> {
+ if (result.isSuccess()) {
+ onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper);
+ } else {
+ onFindPrimaryShardFailure(result.failed().get(), parent, shardName, contextWrapper);
}
+ return null;
}, actorUtils.getClientDispatcher());
}
-
- return transactionContextWrapper;
+ return contextWrapper;
}
private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {