import com.google.common.base.Preconditions;
import java.util.Optional;
import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*/
final class SequencedQueueEntry {
private static final class LastTry {
- final Request<?, ?> request;
final long timeTicks;
+ final long retry;
- LastTry(final Request<?, ?> request, final long when) {
- this.request = Preconditions.checkNotNull(request);
- this.timeTicks = when;
+ LastTry(final long retry, final long timeTicks) {
+ this.retry = retry;
+ this.timeTicks = timeTicks;
}
}
private final Request<?, ?> request;
private final RequestCallback callback;
private final long enqueuedTicks;
+ private final long sequence;
private Optional<LastTry> lastTry = Optional.empty();
- SequencedQueueEntry(final Request<?, ?> request, final RequestCallback callback, final long now) {
+ SequencedQueueEntry(final Request<?, ?> request, final long sequence, final RequestCallback callback,
+ final long now) {
this.request = Preconditions.checkNotNull(request);
this.callback = Preconditions.checkNotNull(callback);
this.enqueuedTicks = now;
+ this.sequence = sequence;
}
long getSequence() {
- return request.getSequence();
+ return sequence;
}
- boolean acceptsResponse(final Response<?, ?> response) {
- return getSequence() == response.getSequence() && request.getTarget().equals(response.getTarget());
+ boolean acceptsResponse(final ResponseEnvelope<?> response) {
+ return getSequence() == response.getSequence() && request.getTarget().equals(response.getMessage().getTarget());
}
long getCurrentTry() {
- final Request<?, ?> req = lastTry.isPresent() ? lastTry.get().request : request;
- return req.getRetry();
+ return lastTry.isPresent() ? lastTry.get().retry : 0;
}
ClientActorBehavior complete(final Response<?, ?> response) {
}
boolean isTimedOut(final long now, final long timeoutNanos) {
- final Request<?, ?> req;
final long elapsed;
if (lastTry.isPresent()) {
- final LastTry t = lastTry.get();
- elapsed = now - t.timeTicks;
- req = t.request;
+ elapsed = now - lastTry.get().timeTicks;
} else {
elapsed = now - enqueuedTicks;
- req = request;
}
if (elapsed >= timeoutNanos) {
- LOG.debug("Request {} timed out after {}ns", req, elapsed);
+ LOG.debug("Request {} timed out after {}ns", request, elapsed);
return true;
} else {
return false;
}
void retransmit(final BackendInfo backend, final long now) {
- final Request<?, ?> nextTry = lastTry.isPresent() ? lastTry.get().request.incrementRetry() : request;
- final Request<?, ?> toSend = nextTry.toVersion(backend.getVersion());
- final ActorRef actor = backend.getActor();
+ final long retry = lastTry.isPresent() ? lastTry.get().retry + 1 : 0;
+ final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()), sequence, retry);
+ final ActorRef actor = backend.getActor();
LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor);
actor.tell(toSend, ActorRef.noSender());
- lastTry = Optional.of(new LastTry(toSend, now));
+ lastTry = Optional.of(new LastTry(retry, now));
}
@Override