*/
public final void sendRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
final long now = currentTime();
- long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
- try {
- if (delay >= DEBUG_DELAY_NANOS) {
- if (delay > MAX_DELAY_NANOS) {
- LOG.info("Capping {} throttle delay from {} to {} seconds", this,
- TimeUnit.NANOSECONDS.toSeconds(delay), MAX_DELAY_SECONDS);
- delay = MAX_DELAY_NANOS;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sleeping for {}ms", TimeUnit.NANOSECONDS.toMillis(delay));
- }
- }
- TimeUnit.NANOSECONDS.sleep(delay);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now);
- }
+ sendEntry(new ConnectionEntry(request, callback, now), now);
}
/**
enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime());
}
+ public final long enqueueEntry(final ConnectionEntry entry, final long now) {
+ lock.lock();
+ try {
+ final RequestException maybePoison = poisoned;
+ if (maybePoison != null) {
+ throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+ }
+
+ if (queue.isEmpty()) {
+ // The queue is becoming non-empty, schedule a timer.
+ scheduleTimer(entry.getEnqueuedTicks() + REQUEST_TIMEOUT_NANOS - now);
+ }
+ return queue.enqueue(entry, now);
+ } finally {
+ lock.unlock();
+ }
+ }
+
public abstract Optional<T> getBackendInfo();
final Collection<ConnectionEntry> startReplay() {
abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current,
RequestException runtimeRequestException);
- final long enqueueEntry(final ConnectionEntry entry, final long now) {
- lock.lock();
+ final void sendEntry(final ConnectionEntry entry, final long now) {
+ long delay = enqueueEntry(entry, now);
try {
- final RequestException maybePoison = poisoned;
- if (maybePoison != null) {
- throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
- }
-
- if (queue.isEmpty()) {
- // The queue is becoming non-empty, schedule a timer.
- scheduleTimer(entry.getEnqueuedTicks() + REQUEST_TIMEOUT_NANOS - now);
+ if (delay >= DEBUG_DELAY_NANOS) {
+ if (delay > MAX_DELAY_NANOS) {
+ LOG.info("Capping {} throttle delay from {} to {} seconds", this,
+ TimeUnit.NANOSECONDS.toSeconds(delay), MAX_DELAY_SECONDS);
+ delay = MAX_DELAY_NANOS;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Sleeping for {}ms on connection {}", context.persistenceId(),
+ TimeUnit.NANOSECONDS.toMillis(delay), this);
+ }
}
- return queue.enqueue(entry, now);
- } finally {
- lock.unlock();
+ TimeUnit.NANOSECONDS.sleep(delay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now);
}
}
@GuardedBy("lock")
private void scheduleTimer(final long delay) {
if (haveTimer) {
- LOG.debug("{}: timer already scheduled", context.persistenceId());
+ LOG.debug("{}: timer already scheduled on {}", context.persistenceId(), this);
return;
}
if (queue.hasSuccessor()) {
- LOG.debug("{}: connection has successor, not scheduling timer", context.persistenceId());
+ LOG.debug("{}: connection {} has a successor, not scheduling timer", context.persistenceId(), this);
return;
}
final long normalized = delay <= 0 ? 0 : Math.min(delay, BACKEND_ALIVE_TIMEOUT_NANOS);
final FiniteDuration dur = FiniteDuration.fromNanos(normalized);
- LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), dur);
+ LOG.debug("{}: connection {} scheduling timeout in {}", context.persistenceId(), this, dur);
context.executeInActor(this::runTimer, dur);
haveTimer = true;
}
try {
haveTimer = false;
final long now = currentTime();
+
+ LOG.debug("{}: running timer on {}", context.persistenceId(), this);
+
// The following line is only reliable when queue is not forwarding, but such state should not last long.
// FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries
final long ticksSinceProgress = queue.ticksStalling(now);
delay = lockedCheckTimeout(now);
if (delay == null) {
// We have timed out. There is no point in scheduling a timer
+ LOG.debug("{}: connection {} timed out", context.persistenceId(), this);
return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
new TimeoutException()));
}
if (delay.isPresent()) {
// If there is new delay, schedule a timer
scheduleTimer(delay.get());
+ } else {
+ LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this);
}
} finally {
lock.unlock();
@GuardedBy("lock")
private Optional<Long> lockedCheckTimeout(final long now) {
if (queue.isEmpty()) {
+ LOG.debug("{}: connection {} is empty", context.persistenceId(), this);
return Optional.empty();
}
final long backendSilentTicks = backendSilentTicks(now);
if (backendSilentTicks >= BACKEND_ALIVE_TIMEOUT_NANOS) {
- LOG.debug("Connection {} has not seen activity from backend for {} nanoseconds, timing out", this,
- backendSilentTicks);
+ LOG.debug("{}: Connection {} has not seen activity from backend for {} nanoseconds, timing out",
+ context.persistenceId(), this, backendSilentTicks);
return null;
}
tasksTimedOut++;
queue.remove(now);
- LOG.debug("Connection {} timed out entryt {}", this, head);
+ LOG.debug("{}: Connection {} timed out entry {}", context.persistenceId(), this, head);
head.complete(head.getRequest().toRequestFailure(
new RequestTimeoutException("Timed out after " + beenOpen + "ns")));
}
package org.opendaylight.controller.cluster.access.client;
import com.google.common.base.Preconditions;
-import java.util.function.Consumer;
-import org.opendaylight.controller.cluster.access.concepts.Request;
-import org.opendaylight.controller.cluster.access.concepts.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
this.successor = Preconditions.checkNotNull(successor);
}
- protected final void sendToSuccessor(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
- successor.sendRequest(request, callback);
+ protected final void sendToSuccessor(final ConnectionEntry entry) {
+ successor.sendRequest(entry.getRequest(), entry.getCallback());
+ }
+
+ protected final void replayToSuccessor(final ConnectionEntry entry) {
+ successor.enqueueRequest(entry.getRequest(), entry.getCallback(), entry.getEnqueuedTicks());
}
protected abstract void forwardEntry(ConnectionEntry entry, long now);
+ protected abstract void replayEntry(ConnectionEntry entry, long now);
+
final AbstractReceivingClientConnection<?> successor() {
return successor;
}
@Override
protected void forwardEntry(final ConnectionEntry entry, final long now) {
- // We are ignoring requested delay, as we have already paid the admission delay
+ successor().sendEntry(entry, now);
+ }
+
+ @Override
+ protected void replayEntry(final ConnectionEntry entry, final long now) {
+ // We are executing in the context of the client thread, do not block
successor().enqueueEntry(entry, now);
}
}
*/
final long enqueue(final ConnectionEntry entry, final long now) {
if (successor != null) {
+ // This call will pay the enqueuing price, hence the caller does not have to
successor.forwardEntry(entry, now);
return 0;
}
int count = 0;
ConnectionEntry entry = inflight.poll();
while (entry != null) {
- successor.forwardEntry(entry, now);
+ successor.replayEntry(entry, now);
entry = inflight.poll();
count++;
}
entry = pending.poll();
while (entry != null) {
- successor.forwardEntry(entry, now);
+ successor.replayEntry(entry, now);
entry = pending.poll();
count++;
}
for (Object obj : successfulRequests) {
if (obj instanceof TransactionRequest) {
LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
- successor.replayRequest((TransactionRequest<?>) obj, resp -> { }, now);
+ successor.doReplayRequest((TransactionRequest<?>) obj, resp -> { }, now);
} else {
Verify.verify(obj instanceof IncrementSequence);
final IncrementSequence increment = (IncrementSequence) obj;
- successor.replayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
+ successor.doReplayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
increment.getSequence(), localActor(), isSnapshotOnly(), increment.getDelta()), resp -> { },
now);
LOG.debug("Incrementing sequence {} to successor {}", obj, successor);
if (getIdentifier().equals(req.getTarget())) {
Verify.verify(req instanceof TransactionRequest, "Unhandled request %s", req);
LOG.debug("Replaying queued request {} to successor {}", req, successor);
- successor.replayRequest((TransactionRequest<?>) req, e.getCallback(), e.getEnqueuedTicks());
+ successor.doReplayRequest((TransactionRequest<?>) req, e.getCallback(), e.getEnqueuedTicks());
it.remove();
}
}
* @param callback Callback to be invoked once the request completes
* @param enqueuedTicks ticker-based time stamp when the request was enqueued
*/
- private void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+ private void doReplayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
final long enqueuedTicks) {
if (request instanceof AbstractLocalTransactionRequest) {
handleReplayedLocalRequest((AbstractLocalTransactionRequest<?>) request, callback, enqueuedTicks);
}
}
+ final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
+ getSuccessorState().getSuccessor().doReplayRequest(request, callback, enqueuedTicks);
+ }
+
abstract boolean isSnapshotOnly();
abstract void doDelete(YangInstanceIdentifier path);
@Override
protected void forwardEntry(final ConnectionEntry entry, final long now) {
+ try {
+ findCohort(entry).forwardEntry(entry, this::sendToSuccessor);
+ } catch (RequestException e) {
+ entry.complete(entry.getRequest().toRequestFailure(e));
+ }
+ }
+
+ @Override
+ protected void replayEntry(final ConnectionEntry entry, final long now) {
+ try {
+ findCohort(entry).replayEntry(entry, this::replayToSuccessor);
+ } catch (RequestException e) {
+ entry.complete(entry.getRequest().toRequestFailure(e));
+ }
+ }
+
+ private ProxyReconnectCohort findCohort(final ConnectionEntry entry) throws CohortNotFoundException {
final Request<? , ?> request = entry.getRequest();
final LocalHistoryIdentifier historyId;
throw new IllegalArgumentException("Unhandled request " + request);
}
- try {
- final ProxyReconnectCohort cohort = cohorts.get(historyId);
- if (cohort == null) {
- LOG.warn("Cohort for request {} not found, aborting it", request);
- throw new CohortNotFoundException(historyId);
- }
-
- cohort.forwardRequest(request, entry.getCallback(), this::sendToSuccessor);
- } catch (RequestException e) {
- entry.complete(request.toRequestFailure(e));
+ final ProxyReconnectCohort cohort = cohorts.get(historyId);
+ if (cohort == null) {
+ LOG.warn("Cohort for request {} not found, aborting it", request);
+ throw new CohortNotFoundException(historyId);
}
+
+ return cohort;
}
-}
\ No newline at end of file
+}
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
}
@Override
- void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
- final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
- // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
- // period required to get into the queue.
+ void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
+ throws RequestException {
+ final Request<?, ?> request = entry.getRequest();
if (request instanceof TransactionRequest) {
- forwardTransactionRequest((TransactionRequest<?>) request, callback);
+ lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
+ entry.getEnqueuedTicks());
} else if (request instanceof LocalHistoryRequest) {
- forwardTo.accept(request, callback);
+ replayTo.accept(entry);
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
}
- private void forwardTransactionRequest(final TransactionRequest<?> request,
- final Consumer<Response<?, ?>> callback) throws RequestException {
+ @Override
+ void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
+ throws RequestException {
+ final Request<?, ?> request = entry.getRequest();
+ if (request instanceof TransactionRequest) {
+ lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
+ } else if (request instanceof LocalHistoryRequest) {
+ forwardTo.accept(entry);
+ } else {
+ throw new IllegalArgumentException("Unhandled request " + request);
+ }
+ }
+ private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
+ throws RequestReplayException {
final AbstractProxyTransaction proxy;
lock.lock();
try {
} finally {
lock.unlock();
}
- if (proxy == null) {
- throw new RequestReplayException("Failed to find proxy for %s", request);
+ if (proxy != null) {
+ return proxy;
}
- proxy.forwardRequest(request, callback);
+ throw new RequestReplayException("Failed to find proxy for %s", request);
}
}
package org.opendaylight.controller.cluster.databroker.actors.dds;
import java.util.Collection;
-import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
-import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.yangtools.concepts.Identifiable;
abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifier> {
abstract ProxyHistory finishReconnect();
- abstract void forwardRequest(Request<?, ?> request, Consumer<Response<?, ?>> callback,
- BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException;
+ abstract void replayEntry(ConnectionEntry entry, Consumer<ConnectionEntry> replayTo) throws RequestException;
+
+ abstract void forwardEntry(ConnectionEntry entry, Consumer<ConnectionEntry> forwardTo) throws RequestException;
}