It seems we are getting stuck after replay on purge requests,
which are dispatched internally.
Make sure we do not use sendRequest() in obvious replay places,
nor for purge requests. Also add a debug upcall if we happen to
sleep for more than 100msec.
Change-Id: Iec667f2039610f3f036e6b88c7c7e7b773cdfc19
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit
20ece8c549211d1c453f1763132bb0a0ca7be0e0)
@VisibleForTesting
static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
@VisibleForTesting
static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
+ // Emit a debug entry if we sleep for more that this amount
+ private static final long DEBUG_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
+
private final Lock lock = new ReentrantLock();
private final ClientActorContext context;
@GuardedBy("lock")
private final Lock lock = new ReentrantLock();
private final ClientActorContext context;
@GuardedBy("lock")
final long now = currentTime();
final long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
try {
final long now = currentTime();
final long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
try {
+ if (delay >= DEBUG_DELAY_NANOS && LOG.isDebugEnabled()) {
+ LOG.debug("Sleeping for {}ms", TimeUnit.NANOSECONDS.toMillis(delay));
+ }
TimeUnit.NANOSECONDS.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
TimeUnit.NANOSECONDS.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
sendRequest(abortRequest(), resp -> {
LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp);
sendRequest(abortRequest(), resp -> {
LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp);
// This is a terminal request, hence we do not need to record it
LOG.debug("Transaction {} abort completed", this);
// This is a terminal request, hence we do not need to record it
LOG.debug("Transaction {} abort completed", this);
// This is a terminal request, hence we do not need to record it
LOG.debug("Transaction {} directCommit completed", this);
// This is a terminal request, hence we do not need to record it
LOG.debug("Transaction {} directCommit completed", this);
}
LOG.debug("Transaction {} doCommit completed", this);
}
LOG.debug("Transaction {} doCommit completed", this);
- private void sendPurge() {
- sendPurge(null);
+ private void enqueuePurge() {
+ enqueuePurge(null);
- final void sendPurge(final Consumer<Response<?, ?>> callback) {
- sendRequest(purgeRequest(), resp -> completePurge(resp, callback));
+ final void enqueuePurge(final Consumer<Response<?, ?>> callback) {
+ // Purge request are dispatched internally, hence should not wait
+ enqueuePurge(callback, parent.currentTime());
}
final void enqueuePurge(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
}
final void enqueuePurge(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
- enqueueRequest(purgeRequest(), resp -> completePurge(resp, callback), enqueuedTicks);
+ enqueueRequest(purgeRequest(), resp -> {
+ LOG.debug("Transaction {} purge completed", this);
+ parent.completeTransaction(this);
+ if (callback != null) {
+ callback.accept(resp);
+ }
+ }, enqueuedTicks);
}
private TransactionPurgeRequest purgeRequest() {
}
private TransactionPurgeRequest purgeRequest() {
return new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
}
return new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
}
- private void completePurge(final Response<?, ?> resp, final Consumer<Response<?, ?>> callback) {
- LOG.debug("Transaction {} purge completed", this);
- parent.completeTransaction(this);
- if (callback != null) {
- callback.accept(resp);
- }
- }
-
// Called with the connection unlocked
final synchronized void startReconnect() {
// At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
// Called with the connection unlocked
final synchronized void startReconnect() {
// At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
} else if (handleReadRequest(request, callback)) {
// No-op
} else if (request instanceof TransactionPurgeRequest) {
} else if (handleReadRequest(request, callback)) {
// No-op
} else if (request instanceof TransactionPurgeRequest) {
+ enqueuePurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
successor.abort();
} else if (request instanceof TransactionPurgeRequest) {
LOG.debug("Forwarding purge {} to successor {}", request, successor);
successor.abort();
} else if (request instanceof TransactionPurgeRequest) {
LOG.debug("Forwarding purge {} to successor {}", request, successor);
- successor.sendPurge(callback);
+ successor.enqueuePurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
if (request instanceof AbortLocalTransactionRequest) {
successor.sendAbort(request, callback);
} else if (request instanceof TransactionPurgeRequest) {
if (request instanceof AbortLocalTransactionRequest) {
successor.sendAbort(request, callback);
} else if (request instanceof TransactionPurgeRequest) {
- successor.sendPurge(callback);
+ successor.enqueuePurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
if (identifier.equals(req.getTarget())) {
Verify.verify(req instanceof LocalHistoryRequest);
if (req instanceof CreateLocalHistoryRequest) {
if (identifier.equals(req.getTarget())) {
Verify.verify(req instanceof LocalHistoryRequest);
if (req instanceof CreateLocalHistoryRequest) {
- successor.connection.sendRequest(req, e.getCallback());
+ successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
if (identifier.equals(req.getTarget())) {
Verify.verify(req instanceof LocalHistoryRequest);
if (req instanceof DestroyLocalHistoryRequest) {
if (identifier.equals(req.getTarget())) {
Verify.verify(req instanceof LocalHistoryRequest);
if (req instanceof DestroyLocalHistoryRequest) {
- successor.connection.sendRequest(req, e.getCallback());
+ successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
ensureFlushedBuider();
sendDoAbort(callback);
} else if (request instanceof TransactionPurgeRequest) {
ensureFlushedBuider();
sendDoAbort(callback);
} else if (request instanceof TransactionPurgeRequest) {
+ enqueuePurge(callback);
} else {
throw new IllegalArgumentException("Unhandled request {}" + request);
}
} else {
throw new IllegalArgumentException("Unhandled request {}" + request);
}