2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import akka.actor.ActorRef;
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import java.util.Optional;
14 import org.opendaylight.controller.cluster.access.concepts.Request;
15 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
16 import org.opendaylight.controller.cluster.access.concepts.RequestException;
17 import org.opendaylight.controller.cluster.access.concepts.Response;
18 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
23 * Single entry in {@link SequencedQueue}. Tracks the request, the associated callback and accounting information.
25 * @author Robert Varga
27 * @param <I> Target identifier type
29 final class SequencedQueueEntry {
30 private static final class LastTry {
34 LastTry(final long retry, final long timeTicks) {
36 this.timeTicks = timeTicks;
40 private static final Logger LOG = LoggerFactory.getLogger(SequencedQueueEntry.class);
42 private final Request<?, ?> request;
43 private final RequestCallback callback;
44 private final long enqueuedTicks;
45 private final long sequence;
47 private Optional<LastTry> lastTry = Optional.empty();
49 SequencedQueueEntry(final Request<?, ?> request, final long sequence, final RequestCallback callback,
51 this.request = Preconditions.checkNotNull(request);
52 this.callback = Preconditions.checkNotNull(callback);
53 this.enqueuedTicks = now;
54 this.sequence = sequence;
61 boolean acceptsResponse(final ResponseEnvelope<?> response) {
62 return getSequence() == response.getSequence() && request.getTarget().equals(response.getMessage().getTarget());
65 long getCurrentTry() {
66 return lastTry.isPresent() ? lastTry.get().retry : 0;
69 ClientActorBehavior complete(final Response<?, ?> response) {
70 LOG.debug("Completing request {} with {}", request, response);
71 return callback.complete(response);
74 void poison(final RequestException cause) {
75 LOG.trace("Poisoning request {}", request, cause);
76 callback.complete(request.toRequestFailure(cause));
79 boolean isTimedOut(final long now, final long timeoutNanos) {
82 if (lastTry.isPresent()) {
83 elapsed = now - lastTry.get().timeTicks;
85 elapsed = now - enqueuedTicks;
88 if (elapsed >= timeoutNanos) {
89 LOG.debug("Request {} timed out after {}ns", request, elapsed);
96 void retransmit(final BackendInfo backend, final long now) {
97 final long retry = lastTry.isPresent() ? lastTry.get().retry + 1 : 0;
98 final RequestEnvelope toSend = new RequestEnvelope(request.toVersion(backend.getVersion()), sequence, retry);
100 final ActorRef actor = backend.getActor();
101 LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor);
102 actor.tell(toSend, ActorRef.noSender());
103 lastTry = Optional.of(new LastTry(retry, now));
107 public String toString() {
108 return MoreObjects.toStringHelper(SequencedQueueEntry.class).add("request", request).toString();