import akka.actor.ActorRef;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Optional;
return context.self();
}
+ public final long currentTime() {
+ return context.ticker().read();
+ }
+
/**
* Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
* from any thread.
* @param callback Callback to invoke
*/
public final void sendRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
- final RequestException maybePoison = poisoned;
- if (maybePoison != null) {
- throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+ final long now = currentTime();
+ final long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
+ try {
+ TimeUnit.NANOSECONDS.sleep(delay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now);
}
+ }
- final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime());
- enqueueAndWait(entry, entry.getEnqueuedTicks());
+ /**
+ * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
+ * from any thread.
+ *
+ * <p>
+ * Note that unlike {@link #sendRequest(Request, Consumer)}, this method does not exert backpressure, hence it
+ * should never be called from an application thread.
+ *
+ * @param request Request to send
+ * @param callback Callback to invoke
+ * @param enqueuedTicks Time (according to {@link #currentTime()} of request enqueue
+ */
+ public final void enqueueRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
+ enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime());
}
public abstract Optional<T> getBackendInfo();
@GuardedBy("lock")
final void setForwarder(final ReconnectForwarder forwarder) {
- queue.setForwarder(forwarder, readTime());
+ queue.setForwarder(forwarder, currentTime());
}
@GuardedBy("lock")
abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current);
- private long readTime() {
- return context.ticker().read();
- }
-
final long enqueueEntry(final ConnectionEntry entry, final long now) {
lock.lock();
try {
+ final RequestException maybePoison = poisoned;
+ if (maybePoison != null) {
+ throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+ }
+
if (queue.isEmpty()) {
// The queue is becoming non-empty, schedule a timer
scheduleTimer(REQUEST_TIMEOUT_DURATION);
}
}
- final void enqueueAndWait(final ConnectionEntry entry, final long now) {
- final long delay = enqueueEntry(entry, now);
- try {
- TimeUnit.NANOSECONDS.sleep(delay);
- } catch (InterruptedException e) {
- LOG.debug("Interrupted while sleeping", e);
- }
- }
-
final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current) {
lock.lock();
try {
lock.lock();
try {
haveTimer = false;
- final long now = readTime();
+ final long now = currentTime();
// The following line is only reliable when queue is not forwarding, but such state should not last long.
final long ticksSinceProgress = queue.ticksStalling(now);
if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
}
final void receiveResponse(final ResponseEnvelope<?> envelope) {
- final long now = readTime();
+ final long now = currentTime();
final Optional<TransmittedConnectionEntry> maybeEntry;
lock.lock();
entry.complete(envelope.getMessage());
}
}
+
+ @Override
+ public final String toString() {
+ return addToStringAttributes(MoreObjects.toStringHelper(this).omitNullValues()).toString();
+ }
+
+ ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+ return toStringHelper.add("client", context.getIdentifier()).add("cookie", cookie).add("poisoned", poisoned);
+ }
}