import akka.actor.ActorRef;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
-import java.util.Optional;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Single entry in {@link SequencedQueue}. Tracks the request, the associated callback and accounting information.
*
* @author Robert Varga
- *
- * @param <I> Target identifier type
*/
final class SequencedQueueEntry {
- private static final class LastTry {
- final long timeTicks;
- final long retry;
-
- LastTry(final long retry, final long timeTicks) {
- this.retry = retry;
- this.timeTicks = timeTicks;
- }
- }
-
private static final Logger LOG = LoggerFactory.getLogger(SequencedQueueEntry.class);
private final Request<?, ?> request;
private final RequestCallback callback;
private final long enqueuedTicks;
- private final long sequence;
- private Optional<LastTry> lastTry = Optional.empty();
+ private TxDetails txDetails;
- SequencedQueueEntry(final Request<?, ?> request, final long sequence, final RequestCallback callback,
+ SequencedQueueEntry(final Request<?, ?> request, final RequestCallback callback,
final long now) {
this.request = Preconditions.checkNotNull(request);
this.callback = Preconditions.checkNotNull(callback);
this.enqueuedTicks = now;
- this.sequence = sequence;
}
- long getSequence() {
- return sequence;
+ Request<?, ?> getRequest() {
+ return request;
}
- boolean acceptsResponse(final ResponseEnvelope<?> response) {
- return getSequence() == response.getSequence() && request.getTarget().equals(response.getMessage().getTarget());
+ @Nullable TxDetails getTxDetails() {
+ return txDetails;
}
- long getCurrentTry() {
- return lastTry.isPresent() ? lastTry.get().retry : 0;
- }
-
ClientActorBehavior complete(final Response<?, ?> response) {
LOG.debug("Completing request {} with {}", request, response);
return callback.complete(response);
boolean isTimedOut(final long now, final long timeoutNanos) {
final long elapsed;
- if (lastTry.isPresent()) {
- elapsed = now - lastTry.get().timeTicks;
+ if (txDetails != null) {
+ elapsed = now - txDetails.getTimeTicks();
} else {
elapsed = now - enqueuedTicks;
}
}
}
- void retransmit(final BackendInfo backend, final long now) {
- final long retry = lastTry.isPresent() ? lastTry.get().retry + 1 : 0;
- final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()), sequence, retry);
+ void retransmit(final BackendInfo backend, final long txSequence, final long now) {
+ final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()),
+ backend.getSessionId(), txSequence);
final ActorRef actor = backend.getActor();
- LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor);
+ LOG.trace("Transmitting request {} as {} to {}", request, toSend, actor);
actor.tell(toSend, ActorRef.noSender());
- lastTry = Optional.of(new LastTry(retry, now));
+ txDetails = new TxDetails(backend.getSessionId(), txSequence, now);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(SequencedQueueEntry.class).add("request", request).toString();
}
+
}