BUG-3219: Fix OutboundQueue request error reporting
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueImpl.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import com.google.common.util.concurrent.FutureCallback;
13 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
14 import javax.annotation.Nonnull;
15 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
16 import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 final class OutboundQueueImpl implements OutboundQueue {
23     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
24     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
25             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
26     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
27             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
28     private final OutboundQueueManager<?> manager;
29     private final OutboundQueueEntry[] queue;
30     private final long baseXid;
31     private final long endXid;
32     private final int reserve;
33
34     // Updated concurrently
35     private volatile int barrierOffset = -1;
36     private volatile int reserveOffset = 0;
37
38     // Updated from Netty only
39     private int flushOffset;
40     private int completeCount;
41     private int lastBarrierOffset = -1;
42
43     OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
44         /*
45          * We use the last entry as an emergency should a timeout-triggered
46          * flush request race with normal users for the last entry in this
47          * queue. In that case the flush request will take the last entry and
48          * schedule a flush, which means that we will get around sending the
49          * message as soon as the user finishes the reservation.
50          */
51         Preconditions.checkArgument(maxQueue > 1);
52         this.baseXid = baseXid;
53         this.endXid = baseXid + maxQueue;
54         this.reserve = maxQueue - 1;
55         this.manager = Preconditions.checkNotNull(manager);
56         queue = new OutboundQueueEntry[maxQueue];
57         for (int i = 0; i < maxQueue; ++i) {
58             queue[i] = new OutboundQueueEntry();
59         }
60     }
61
62     private OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final OutboundQueueEntry[] queue) {
63         this.manager = Preconditions.checkNotNull(manager);
64         this.queue = Preconditions.checkNotNull(queue);
65         this.baseXid = baseXid;
66         this.endXid = baseXid + queue.length;
67         this.reserve = queue.length - 1;
68         for (OutboundQueueEntry element : queue) {
69             element.reset();
70         }
71     }
72
73     OutboundQueueImpl reuse(final long baseXid) {
74         return new OutboundQueueImpl(manager, baseXid, queue);
75     }
76
77     @Override
78     public Long reserveEntry() {
79         return reserveEntry(false);
80     }
81
82     @Override
83     public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
84         final int offset = (int)(xid - baseXid);
85         if (message != null) {
86             Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
87         }
88
89         final OutboundQueueEntry entry = queue[offset];
90         entry.commit(message, callback);
91         LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
92
93         if (entry.isBarrier()) {
94             int my = offset;
95             for (;;) {
96                 final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
97                 if (prev < my) {
98                     LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
99                     break;
100                 }
101
102                 // We have traveled back, recover
103                 my = prev;
104             }
105         }
106
107         manager.ensureFlushing(this);
108     }
109
110     private Long reserveEntry(final boolean forBarrier) {
111         final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
112         if (offset >= reserve) {
113             if (forBarrier) {
114                 LOG.debug("Queue {} offset {}/{}, using emergency slot", this, offset, queue.length);
115                 return endXid;
116             } else {
117                 LOG.debug("Queue {} offset {}/{}, not allowing reservation", this, offset, queue.length);
118                 return null;
119             }
120         }
121
122         final Long xid = baseXid + offset;
123         LOG.debug("Queue {} allocated XID {} at offset {}", this, xid, offset);
124         return xid;
125     }
126
127     Long reserveBarrierIfNeeded() {
128         final int bo = barrierOffset;
129         if (bo >= flushOffset) {
130             LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
131             return null;
132         } else {
133             return reserveEntry(true);
134         }
135     }
136
137     /**
138      * An empty queue is a queue which has no further unflushed entries.
139      *
140      * @return True if this queue does not have unprocessed entries.
141      */
142     boolean isEmpty() {
143         int ro = reserveOffset;
144         if (ro >= reserve) {
145             if (queue[reserve].isCommitted()) {
146                 ro = reserve + 1;
147             } else {
148                 ro = reserve;
149             }
150         }
151
152         LOG.debug("Effective flush/reserve offset {}/{}", flushOffset, ro);
153         return ro <= flushOffset;
154     }
155
156     /**
157      * A queue is finished when all of its entries have been completed.
158      *
159      * @return False if there are any uncompleted requests.
160      */
161     boolean isFinished() {
162         if (completeCount < reserve) {
163             return false;
164         }
165
166         // We need to check if the last entry was used
167         final OutboundQueueEntry last = queue[reserve];
168         return !last.isCommitted() || last.isCompleted();
169     }
170
171     boolean isFlushed() {
172         LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
173         if (flushOffset < reserve) {
174             return false;
175         }
176
177         // flushOffset implied == reserve
178         return flushOffset >= queue.length || !queue[reserve].isCommitted();
179     }
180
181     OfHeader flushEntry() {
182         for (;;) {
183             // No message ready
184             if (isEmpty()) {
185                 LOG.trace("Flushed all reserved entries up to ", flushOffset);
186                 return null;
187             }
188
189             final OutboundQueueEntry entry = queue[flushOffset];
190             if (!entry.isCommitted()) {
191                 LOG.trace("Request at offset {} not ready yet, giving up", flushOffset);
192                 return null;
193             }
194
195             final OfHeader msg = entry.getMessage();
196             flushOffset++;
197             if (msg != null) {
198                 return msg;
199             }
200
201             LOG.trace("Null message, skipping to offset {}", flushOffset);
202         }
203     }
204
205     // Argument is 'long' to explicitly convert before performing operations
206     private boolean xidInRange(final long xid) {
207         return xid < endXid && (xid >= baseXid || baseXid > endXid);
208     }
209
210     private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
211         if (response instanceof Error) {
212             final Error err = (Error)response;
213             LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
214             entry.fail(new DeviceRequestFailedException("Device-side failure", err));
215             return true;
216         } else {
217             return entry.complete(response);
218         }
219     }
220
221     /**
222      * Return the request entry corresponding to a response. Returns null
223      * if there is no request matching the response.
224      *
225      * @param response Response message
226      * @return Matching request entry, or null if no match is found.
227      */
228     OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
229         final Long xid = response.getXid();
230         if (!xidInRange(xid)) {
231             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
232             return null;
233         }
234
235         final int offset = (int)(xid - baseXid);
236         final OutboundQueueEntry entry = queue[offset];
237         if (entry.isCompleted()) {
238             LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
239             return null;
240         }
241
242         if (entry.isBarrier()) {
243             // This has been a barrier -- make sure we complete all preceding requests.
244             // XXX: Barriers are expected to complete in one message.
245             //      If this assumption is changed, this logic will need to be expanded
246             //      to ensure that the requests implied by the barrier are reported as
247             //      completed *after* the barrier.
248             LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
249             completeRequests(offset);
250             lastBarrierOffset = offset;
251
252             final boolean success = completeEntry(entry, response);
253             Verify.verify(success, "Barrier request failed to complete");
254             completeCount++;
255         } else if (completeEntry(entry, response)) {
256             completeCount++;
257         }
258
259         return entry;
260     }
261
262     private void completeRequests(final int toOffset) {
263         for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
264             final OutboundQueueEntry entry = queue[i];
265             if (!entry.isCompleted() && entry.complete(null)) {
266                 completeCount++;
267             }
268         }
269     }
270
271     void completeAll() {
272         completeRequests(queue.length);
273     }
274
275     int failAll(final Throwable cause) {
276         int ret = 0;
277         for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
278             final OutboundQueueEntry entry = queue[i];
279             if (!entry.isCompleted()) {
280                 entry.fail(cause);
281                 ret++;
282             }
283         }
284
285         return ret;
286     }
287 }