final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
final String shardName) {
final TransactionContextWrapper transactionContextWrapper =
- new TransactionContextWrapper(parent.getIdentifier(), actorContext);
+ new TransactionContextWrapper(parent.getIdentifier(), actorContext, shardName);
Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
if (findPrimaryFuture.isCompleted()) {
if (semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
return true;
}
-
- LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", identifier, e);
this.semaphore.release(permits);
}
- public TransactionIdentifier getIdentifier() {
+ @VisibleForTesting
+ TransactionIdentifier getIdentifier() {
return identifier;
}
* @return True if a permit was successfully acquired, false otherwise
*/
private boolean acquireOperation() {
- return isOperationHandOffComplete() && limiter.acquire();
+ if (isOperationHandOffComplete()) {
+ if (limiter.acquire()) {
+ return true;
+ }
+
+ LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(),
+ actor);
+ }
+
+ return false;
}
@Override
*/
@GuardedBy("queuedTxOperations")
private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
-
private final TransactionIdentifier identifier;
+ private final String shardName;
/**
* The resulting TransactionContext.
private final OperationLimiter limiter;
- TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext) {
+ TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
+ final String shardName) {
this.identifier = Preconditions.checkNotNull(identifier);
this.limiter = new OperationLimiter(identifier,
// 1 extra permit for the ready operation
actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1,
TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis()));
+ this.shardName = Preconditions.checkNotNull(shardName);
}
TransactionContext getTransactionContext() {
final boolean invokeOperation;
synchronized (queuedTxOperations) {
if (transactionContext == null) {
- LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
+ LOG.debug("Tx {} Queuing TransactionOperation", identifier);
queuedTxOperations.add(operation);
invokeOperation = false;
if (invokeOperation) {
operation.invoke(transactionContext);
} else {
- limiter.acquire();
+ if (!limiter.acquire()) {
+ LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
+ shardName);
+ }
}
}
MockitoAnnotations.initMocks(this);
doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
transactionContextWrapper = new TransactionContextWrapper(MockIdentifiers.transactionIdentifier(
- TransactionContextWrapperTest.class, "mock"), actorContext);
+ TransactionContextWrapperTest.class, "mock"), actorContext, "mock");
}
@Test