2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.actors.client;
10 import akka.actor.ActorRef;
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import java.util.Optional;
14 import org.opendaylight.controller.cluster.access.concepts.Request;
15 import org.opendaylight.controller.cluster.access.concepts.RequestException;
16 import org.opendaylight.controller.cluster.access.concepts.Response;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
21 * Single entry in {@link SequencedQueue}. Tracks the request, the associated callback and accounting information.
23 * @author Robert Varga
25 * @param <I> Target identifier type
27 final class SequencedQueueEntry {
28 private static final class LastTry {
29 final Request<?, ?> request;
32 LastTry(final Request<?, ?> request, final long when) {
33 this.request = Preconditions.checkNotNull(request);
34 this.timeTicks = when;
38 private static final Logger LOG = LoggerFactory.getLogger(SequencedQueueEntry.class);
40 private final Request<?, ?> request;
41 private final RequestCallback callback;
42 private final long enqueuedTicks;
44 private Optional<LastTry> lastTry = Optional.empty();
46 SequencedQueueEntry(final Request<?, ?> request, final RequestCallback callback, final long now) {
47 this.request = Preconditions.checkNotNull(request);
48 this.callback = Preconditions.checkNotNull(callback);
49 this.enqueuedTicks = now;
53 return request.getSequence();
56 boolean acceptsResponse(final Response<?, ?> response) {
57 return getSequence() == response.getSequence() && request.getTarget().equals(response.getTarget());
60 long getCurrentTry() {
61 final Request<?, ?> req = lastTry.isPresent() ? lastTry.get().request : request;
62 return req.getRetry();
65 ClientActorBehavior complete(final Response<?, ?> response) {
66 LOG.debug("Completing request {} with {}", request, response);
67 return callback.complete(response);
70 void poison(final RequestException cause) {
71 LOG.trace("Poisoning request {}", request, cause);
72 callback.complete(request.toRequestFailure(cause));
75 boolean isTimedOut(final long now, final long timeoutNanos) {
76 final Request<?, ?> req;
79 if (lastTry.isPresent()) {
80 final LastTry t = lastTry.get();
81 elapsed = now - t.timeTicks;
84 elapsed = now - enqueuedTicks;
88 if (elapsed >= timeoutNanos) {
89 LOG.debug("Request {} timed out after {}ns", req, elapsed);
96 void retransmit(final BackendInfo backend, final long now) {
97 final Request<?, ?> nextTry = lastTry.isPresent() ? lastTry.get().request.incrementRetry() : request;
98 final Request<?, ?> toSend = nextTry.toVersion(backend.getVersion());
99 final ActorRef actor = backend.getActor();
101 LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor);
102 actor.tell(toSend, ActorRef.noSender());
103 lastTry = Optional.of(new LastTry(toSend, now));
107 public String toString() {
108 return MoreObjects.toStringHelper(SequencedQueueEntry.class).add("request", request).toString();