import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
- TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
+ Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
}
+
+ @Override
+ void preComplete(ResponseEnvelope<?> envelope) {
+ }
}
static final class Transmitting extends TransmitQueue {
+ private static final long NOT_SLICING = -1;
+
private final BackendInfo backend;
+ private final MessageSlicer messageSlicer;
private long nextTxSequence;
+ private long currentSlicedEnvSequenceId = NOT_SLICING;
// For ConnectedClientConnection.
- Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now) {
+ Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now,
+ final MessageSlicer messageSlicer) {
super(oldQueue, targetDepth, now);
this.backend = Preconditions.checkNotNull(backend);
+ this.messageSlicer = Preconditions.checkNotNull(messageSlicer);
}
@Override
}
@Override
- TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
- final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()),
+ Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
+ // If we're currently slicing a message we can't send any subsequent requests until slicing completes to
+ // avoid an out-of-sequence request envelope failure on the backend. In this case we return an empty
+ // Optional to indicate the request was not transmitted.
+ if (currentSlicedEnvSequenceId >= 0) {
+ return Optional.empty();
+ }
+
+ final Request<?, ?> request = entry.getRequest();
+ final RequestEnvelope env = new RequestEnvelope(request.toVersion(backend.getVersion()),
backend.getSessionId(), nextTxSequence++);
- final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(),
- env.getTxSequence(), now);
- backend.getActor().tell(env, ActorRef.noSender());
- return ret;
+ if (request instanceof SliceableMessage) {
+ if (messageSlicer.slice(SliceOptions.builder().identifier(request.getTarget())
+ .message(env).replyTo(request.getReplyTo()).sendTo(backend.getActor())
+ .onFailureCallback(t -> env.sendFailure(new RuntimeRequestException(
+ "Failed to slice request " + request, t), 0L)).build())) {
+ // The request was sliced so record the envelope sequence id to prevent transmitting
+ // subsequent requests until slicing completes.
+ currentSlicedEnvSequenceId = env.getTxSequence();
+ }
+ } else {
+ backend.getActor().tell(env, ActorRef.noSender());
+ }
+
+ return Optional.of(new TransmittedConnectionEntry(entry, env.getSessionId(),
+ env.getTxSequence(), now));
+ }
+
+ @Override
+ void preComplete(ResponseEnvelope<?> envelope) {
+ if (envelope.getTxSequence() == currentSlicedEnvSequenceId) {
+ // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent
+ // requests to be transmitted.
+ currentSlicedEnvSequenceId = NOT_SLICING;
+ }
}
}
// If a matching request was found, this will track a task was closed.
final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
+ preComplete(envelope);
+
Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
if (maybeEntry == null) {
LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
private void transmitEntries(final int maxTransmit, final long now) {
for (int i = 0; i < maxTransmit; ++i) {
final ConnectionEntry e = pending.poll();
- if (e == null) {
+ if (e == null || !transmitEntry(e, now)) {
LOG.debug("Queue {} transmitted {} requests", this, i);
return;
}
-
- transmitEntry(e, now);
}
LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
}
- private void transmitEntry(final ConnectionEntry entry, final long now) {
+ private boolean transmitEntry(final ConnectionEntry entry, final long now) {
LOG.debug("Queue {} transmitting entry {}", this, entry);
// We are not thread-safe and are supposed to be externally-guarded,
// hence send-before-record should be fine.
// This needs to be revisited if the external guards are lowered.
- inflight.addLast(transmit(entry, now));
+ final Optional<TransmittedConnectionEntry> maybeTransmitted = transmit(entry, now);
+ if (!maybeTransmitted.isPresent()) {
+ return false;
+ }
+
+ inflight.addLast(maybeTransmitted.get());
+ return true;
}
final long enqueueOrForward(final ConnectionEntry entry, final long now) {
}
if (pending.isEmpty()) {
- transmitEntry(entry, now);
+ if (!transmitEntry(entry, now)) {
+ LOG.debug("Queue {} cannot transmit request {} - delaying it", this, entry.getRequest());
+ pending.addLast(entry);
+ }
+
return delay;
}
*/
abstract int canTransmitCount(int inflightSize);
- abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);
+ abstract Optional<TransmittedConnectionEntry> transmit(ConnectionEntry entry, long now);
+
+ abstract void preComplete(ResponseEnvelope<?> envelope);
final boolean isEmpty() {
return inflight.isEmpty() && pending.isEmpty();