BUG-5280: add SimpleDataStoreClientBehavior
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / AbstractReceivingClientConnection.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
13 import java.util.ArrayDeque;
14 import java.util.Iterator;
15 import java.util.Optional;
16 import java.util.Queue;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.cluster.access.ABIVersion;
19 import org.opendaylight.controller.cluster.access.concepts.Request;
20 import org.opendaylight.controller.cluster.access.concepts.RequestException;
21 import org.opendaylight.controller.cluster.access.concepts.Response;
22 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.duration.FiniteDuration;
26
27 /**
28  * Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its
29  * sublcasses. It allows us to share some code.
30  *
31  * @author Robert Varga
32  *
33  * @param <T> Concrete {@link BackendInfo} type
34  */
35 abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
36     private static final Logger LOG = LoggerFactory.getLogger(AbstractReceivingClientConnection.class);
37
38     private final Queue<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
39     private final T backend;
40
41     private long lastProgress;
42
43     AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
44         super(context, cookie);
45         this.backend = Preconditions.checkNotNull(backend);
46         this.lastProgress = readTime();
47     }
48
49     AbstractReceivingClientConnection(final AbstractReceivingClientConnection<T> oldConnection) {
50         super(oldConnection);
51         this.backend = oldConnection.backend;
52         this.lastProgress = oldConnection.lastProgress;
53     }
54
55     @Override
56     public final Optional<T> getBackendInfo() {
57         return Optional.of(backend);
58     }
59
60     final ActorRef remoteActor() {
61         return backend.getActor();
62     }
63
64     final int remoteMaxMessages() {
65         return backend.getMaxMessages();
66     }
67
68     final ABIVersion remoteVersion() {
69         return backend.getVersion();
70     }
71
72     final long sessionId() {
73         return backend.getSessionId();
74     }
75
76     final int inflightSize() {
77         return inflight.size();
78     }
79
80     final void appendToInflight(final TransmittedConnectionEntry entry) {
81         // This should never fail
82         inflight.add(entry);
83     }
84
85     @GuardedBy("this")
86     @Override
87     void spliceToSuccessor(final ReconnectForwarder successor) {
88         ConnectionEntry entry = inflight.poll();
89         while (entry != null) {
90             successor.forwardEntry(entry);
91             entry = inflight.poll();
92         }
93
94         super.spliceToSuccessor(successor);
95     }
96
97     @Override
98     void receiveResponse(final ResponseEnvelope<?> envelope) {
99         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
100         if (maybeEntry == null) {
101             LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
102             maybeEntry = findMatchingEntry(pending(), envelope);
103         }
104
105         if (maybeEntry == null || !maybeEntry.isPresent()) {
106             LOG.warn("No request matching {} found, ignoring response", envelope);
107             return;
108         }
109
110         lastProgress = readTime();
111
112         final TransmittedConnectionEntry entry = maybeEntry.get();
113         LOG.debug("Completing {} with {}", entry, envelope);
114         entry.complete(envelope.getMessage());
115
116         // We have freed up a slot, try to transmit something
117         final int toSend = remoteMaxMessages() - inflight.size();
118         if (toSend > 0) {
119             sendMessages(toSend);
120         }
121     }
122
123     @Override
124     boolean isEmpty() {
125         return inflight.isEmpty() && super.isEmpty();
126     }
127
128     @Override
129     void poison(final RequestException cause) {
130         super.poison(cause);
131         poisonQueue(inflight, cause);
132     }
133
134     /**
135      * Transmit a given number of messages.
136      *
137      * @param count Number of messages to transmit, guaranteed to be positive.
138      */
139     abstract void sendMessages(int count);
140
141     /*
142      * We are using tri-state return here to indicate one of three conditions:
143      * - if a matching entry is found, return an Optional containing it
144      * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
145      * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
146      */
147     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
148             justification = "Returning null Optional is documented in the API contract.")
149     private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
150             final ResponseEnvelope<?> envelope) {
151         // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
152         // to use an iterator
153         final Iterator<? extends ConnectionEntry> it = queue.iterator();
154         while (it.hasNext()) {
155             final ConnectionEntry e = it.next();
156             final Request<?, ?> request = e.getRequest();
157             final Response<?, ?> response = envelope.getMessage();
158
159             // First check for matching target, or move to next entry
160             if (!request.getTarget().equals(response.getTarget())) {
161                 continue;
162             }
163
164             // Sanity-check logical sequence, ignore any out-of-order messages
165             if (request.getSequence() != response.getSequence()) {
166                 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
167                 return Optional.empty();
168             }
169
170             // Check if the entry has (ever) been transmitted
171             if (!(e instanceof TransmittedConnectionEntry)) {
172                 return Optional.empty();
173             }
174
175             final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
176
177             // Now check session match
178             if (envelope.getSessionId() != te.getSessionId()) {
179                 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
180                 return Optional.empty();
181             }
182             if (envelope.getTxSequence() != te.getTxSequence()) {
183                 LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
184                 return Optional.empty();
185             }
186
187             LOG.debug("Completing request {} with {}", request, envelope);
188             it.remove();
189             return Optional.of(te);
190         }
191
192         return null;
193     }
194
195     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
196             justification = "Returning null Optional is documented in the API contract.")
197     @Override
198     final Optional<FiniteDuration> checkTimeout(final long now) {
199         final Optional<FiniteDuration> xmit = checkTimeout(inflight.peek(), now);
200         if (xmit == null) {
201             return null;
202         }
203         final Optional<FiniteDuration> pend = super.checkTimeout(now);
204         if (pend == null) {
205             return null;
206         }
207         if (!xmit.isPresent()) {
208             return pend;
209         }
210         if (!pend.isPresent()) {
211             return xmit;
212         }
213
214         return Optional.of(xmit.get().min(pend.get()));
215     }
216 }