*/
@GuardedBy("queuedTxOperations")
private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
-
private final TransactionIdentifier identifier;
+ private final String shardName;
/**
* The resulting TransactionContext.
private final OperationLimiter limiter;
- TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext) {
+ TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
+ final String shardName) {
this.identifier = Preconditions.checkNotNull(identifier);
this.limiter = new OperationLimiter(identifier,
// 1 extra permit for the ready operation
actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1,
TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis()));
+ this.shardName = Preconditions.checkNotNull(shardName);
}
TransactionContext getTransactionContext() {
final boolean invokeOperation;
synchronized (queuedTxOperations) {
if (transactionContext == null) {
- LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
+ LOG.debug("Tx {} Queuing TransactionOperation", identifier);
queuedTxOperations.add(operation);
invokeOperation = false;
if (invokeOperation) {
operation.invoke(transactionContext);
} else {
- limiter.acquire();
+ if (!limiter.acquire()) {
+ LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
+ shardName);
+ }
}
}