BUG-5280: implement message queueing
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / actors / client / SequencedQueueEntry.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.actors.client;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import java.util.Optional;
14 import org.opendaylight.controller.cluster.access.concepts.Request;
15 import org.opendaylight.controller.cluster.access.concepts.RequestException;
16 import org.opendaylight.controller.cluster.access.concepts.Response;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 /**
21  * Single entry in {@link SequencedQueue}. Tracks the request, the associated callback and accounting information.
22  *
23  * @author Robert Varga
24  *
25  * @param <I> Target identifier type
26  */
27 final class SequencedQueueEntry {
28     private static final class LastTry {
29         final Request<?, ?> request;
30         final long timeTicks;
31
32         LastTry(final Request<?, ?> request, final long when) {
33             this.request = Preconditions.checkNotNull(request);
34             this.timeTicks = when;
35         }
36     }
37
38     private static final Logger LOG = LoggerFactory.getLogger(SequencedQueueEntry.class);
39
40     private final Request<?, ?> request;
41     private final RequestCallback callback;
42     private final long enqueuedTicks;
43
44     private Optional<LastTry> lastTry = Optional.empty();
45
46     SequencedQueueEntry(final Request<?, ?> request, final RequestCallback callback, final long now) {
47         this.request = Preconditions.checkNotNull(request);
48         this.callback = Preconditions.checkNotNull(callback);
49         this.enqueuedTicks = now;
50     }
51
52     long getSequence() {
53         return request.getSequence();
54     }
55
56     boolean acceptsResponse(final Response<?, ?> response) {
57         return getSequence() == response.getSequence() && request.getTarget().equals(response.getTarget());
58     }
59
60     long getCurrentTry() {
61         final Request<?, ?> req = lastTry.isPresent() ? lastTry.get().request : request;
62         return req.getRetry();
63      }
64
65     ClientActorBehavior complete(final Response<?, ?> response) {
66         LOG.debug("Completing request {} with {}", request, response);
67         return callback.complete(response);
68     }
69
70     void poison(final RequestException cause) {
71         LOG.trace("Poisoning request {}", request, cause);
72         callback.complete(request.toRequestFailure(cause));
73     }
74
75     boolean isTimedOut(final long now, final long timeoutNanos) {
76         final Request<?, ?> req;
77         final long elapsed;
78
79         if (lastTry.isPresent()) {
80             final LastTry t = lastTry.get();
81             elapsed = now - t.timeTicks;
82             req = t.request;
83         } else {
84             elapsed = now - enqueuedTicks;
85             req = request;
86         }
87
88         if (elapsed >= timeoutNanos) {
89             LOG.debug("Request {} timed out after {}ns", req, elapsed);
90             return true;
91         } else {
92             return false;
93         }
94     }
95
96     void retransmit(final BackendInfo backend, final long now) {
97         final Request<?, ?> nextTry = lastTry.isPresent() ? lastTry.get().request.incrementRetry() : request;
98         final Request<?, ?> toSend = nextTry.toVersion(backend.getVersion());
99         final ActorRef actor = backend.getActor();
100
101         LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor);
102         actor.tell(toSend, ActorRef.noSender());
103         lastTry = Optional.of(new LastTry(toSend, now));
104     }
105
106     @Override
107     public String toString() {
108         return MoreObjects.toStringHelper(SequencedQueueEntry.class).add("request", request).toString();
109     }
110 }