() {
- @Override
- public void onComplete(Throwable failure, ActorSelection primaryShard) {
- if(failure != null) {
- newTxFutureCallback.onComplete(failure, null);
- } else {
- newTxFutureCallback.setPrimaryShard(primaryShard);
- }
- }
- }, actorContext.getActorSystem().dispatcher());
+ try {
+ if(!operationLimiter.tryAcquire(acquirePermits,
+ actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+ LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
+ }
+ } catch (InterruptedException e) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
+ } else {
+ LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
+ }
}
-
- return txFutureCallback;
- }
-
- public String getTransactionChainId() {
- return transactionChainId;
}
- /**
- * Interface for a transaction operation to be invoked later.
- */
- private static interface TransactionOperation {
- void invoke(TransactionContext transactionContext);
+ final void ensureInitializied() {
+ Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier());
}
- /**
- * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
- * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
- * retry task after a short delay.
- *
- * The end result from a completed CreateTransaction message is a TransactionContext that is
- * used to perform transaction operations. Transaction operations that occur before the
- * CreateTransaction completes are cache and executed once the CreateTransaction completes,
- * successfully or not.
- */
- private class TransactionFutureCallback extends OnComplete