Run flush immediately when channel becomes writable
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueImpl.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import com.google.common.util.concurrent.FutureCallback;
13 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
14 import javax.annotation.Nonnull;
15 import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
16 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
17 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 final class OutboundQueueImpl implements OutboundQueue {
24     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
25     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
26             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
27     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
28             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
29     private final OutboundQueueManager<?> manager;
30     private final OutboundQueueEntry[] queue;
31     private final long baseXid;
32     private final long endXid;
33     private final int reserve;
34
35     // Updated concurrently
36     private volatile int barrierOffset = -1;
37     private volatile int reserveOffset = 0;
38
39     // Updated from Netty only
40     private int flushOffset;
41     private int completeCount;
42     private int lastBarrierOffset = -1;
43
44     OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
45         /*
46          * We use the last entry as an emergency should a timeout-triggered
47          * flush request race with normal users for the last entry in this
48          * queue. In that case the flush request will take the last entry and
49          * schedule a flush, which means that we will get around sending the
50          * message as soon as the user finishes the reservation.
51          */
52         Preconditions.checkArgument(maxQueue > 1);
53         this.baseXid = baseXid;
54         this.endXid = baseXid + maxQueue;
55         this.reserve = maxQueue - 1;
56         this.manager = Preconditions.checkNotNull(manager);
57         queue = new OutboundQueueEntry[maxQueue];
58         for (int i = 0; i < maxQueue; ++i) {
59             queue[i] = new OutboundQueueEntry();
60         }
61     }
62
63     private OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final OutboundQueueEntry[] queue) {
64         this.manager = Preconditions.checkNotNull(manager);
65         this.queue = Preconditions.checkNotNull(queue);
66         this.baseXid = baseXid;
67         this.endXid = baseXid + queue.length;
68         this.reserve = queue.length - 1;
69     }
70
71     void retire() {
72         for (OutboundQueueEntry element : queue) {
73             element.reset();
74         }
75     }
76
77     OutboundQueueImpl reuse(final OutboundQueueManager<?> manager, final long baseXid) {
78         return new OutboundQueueImpl(manager, baseXid, queue);
79     }
80
81     @Override
82     public Long reserveEntry() {
83         return reserveEntry(false);
84     }
85
86     @Override
87     public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
88         final int offset = (int)(xid - baseXid);
89         if (message != null) {
90             Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
91         }
92
93         final int ro = reserveOffset;
94         Preconditions.checkArgument(offset < ro, "Unexpected commit to offset %s reserved %s message %s", offset, ro, message);
95
96         final OutboundQueueEntry entry = queue[offset];
97         entry.commit(message, callback);
98         LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, ro);
99
100         if (entry.isBarrier()) {
101             int my = offset;
102             for (;;) {
103                 final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
104                 if (prev < my) {
105                     LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
106                     break;
107                 }
108
109                 // We have traveled back, recover
110                 LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
111                 my = prev;
112             }
113         }
114
115         manager.ensureFlushing(this);
116     }
117
118     private Long reserveEntry(final boolean forBarrier) {
119         final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
120         if (offset >= reserve) {
121             if (forBarrier) {
122                 LOG.debug("Queue {} offset {}/{}, using emergency slot", this, offset, queue.length);
123                 return endXid;
124             } else {
125                 LOG.debug("Queue {} offset {}/{}, not allowing reservation", this, offset, queue.length);
126                 return null;
127             }
128         }
129
130         final Long xid = baseXid + offset;
131         LOG.debug("Queue {} allocated XID {} at offset {}", this, xid, offset);
132         return xid;
133     }
134
135     Long reserveBarrierIfNeeded() {
136         final int bo = barrierOffset;
137         if (bo >= flushOffset) {
138             LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
139             return null;
140         } else {
141             return reserveEntry(true);
142         }
143     }
144
145     int startShutdown() {
146         // Increment the offset by the queue size, hence preventing any normal
147         // allocations. We should not be seeing a barrier reservation after this
148         // and if there is one issued, we can disregard it.
149         final int offset = CURRENT_OFFSET_UPDATER.getAndAdd(this, queue.length);
150
151         // If this offset is larger than reserve, trim it. That is not an accurate
152         // view of which slot was actually "reserved", but it indicates at which
153         // entry we can declare the queue flushed (e.g. at the emergency slot).
154         return offset > reserve ? reserve : offset;
155     }
156
157     boolean isShutdown(final int offset) {
158         // This queue is shutdown if the flushOffset (e.g. the next entry to
159         // be flushed) points to the offset 'reserved' in startShutdown()
160         return flushOffset >= offset;
161     }
162
163     /**
164      * An empty queue is a queue which has no further unflushed entries.
165      *
166      * @return True if this queue does not have unprocessed entries.
167      */
168     private boolean isEmpty() {
169         int ro = reserveOffset;
170         if (ro >= reserve) {
171             if (queue[reserve].isCommitted()) {
172                 ro = reserve + 1;
173             } else {
174                 ro = reserve;
175             }
176         }
177
178         LOG.debug("Effective flush/reserve offset {}/{}", flushOffset, ro);
179         return ro <= flushOffset;
180     }
181
182     /**
183      * A queue is finished when all of its entries have been completed.
184      *
185      * @return False if there are any uncompleted requests.
186      */
187     boolean isFinished() {
188         if (completeCount < reserve) {
189             return false;
190         }
191
192         // We need to check if the last entry was used
193         final OutboundQueueEntry last = queue[reserve];
194         return !last.isCommitted() || last.isCompleted();
195     }
196
197     boolean isFlushed() {
198         LOG.debug("Check queue {} for completeness (offset {}, reserve {})", this, flushOffset, reserve);
199         if (flushOffset < reserve) {
200             return false;
201         }
202
203         // flushOffset implied == reserve
204         return flushOffset >= queue.length || !queue[reserve].isCommitted();
205     }
206
207     boolean needsFlush() {
208         if (flushOffset < reserve) {
209             return queue[flushOffset].isCommitted();
210         }
211
212         if (isFlushed()) {
213             LOG.trace("Queue {} is flushed, schedule a replace", this);
214             return true;
215         }
216         if (isFinished()) {
217             LOG.trace("Queue {} is finished, schedule a cleanup", this);
218             return true;
219         }
220
221         return false;
222     }
223
224     OfHeader flushEntry() {
225         for (;;) {
226             // No message ready
227             if (isEmpty()) {
228                 LOG.trace("Flushed all reserved entries up to {}", flushOffset);
229                 return null;
230             }
231
232             final OutboundQueueEntry entry = queue[flushOffset];
233             if (!entry.isCommitted()) {
234                 LOG.trace("Request at offset {} not ready yet, giving up", flushOffset);
235                 return null;
236             }
237
238             final OfHeader msg = entry.takeMessage();
239             flushOffset++;
240             if (msg != null) {
241                 return msg;
242             }
243
244             LOG.trace("Null message, skipping to offset {}", flushOffset);
245         }
246     }
247
248     // Argument is 'long' to explicitly convert before performing operations
249     private boolean xidInRange(final long xid) {
250         return xid < endXid && (xid >= baseXid || baseXid > endXid);
251     }
252
253     private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
254         if (response instanceof Error) {
255             final Error err = (Error)response;
256             LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
257             entry.fail(new DeviceRequestFailedException("Device-side failure", err));
258             return true;
259         } else {
260             return entry.complete(response);
261         }
262     }
263
264     /**
265      * Return the request entry corresponding to a response. Returns null
266      * if there is no request matching the response.
267      *
268      * @param response Response message
269      * @return Matching request entry, or null if no match is found.
270      */
271     OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
272         final Long xid = response.getXid();
273         if (!xidInRange(xid)) {
274             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
275             return null;
276         }
277
278         final int offset = (int)(xid - baseXid);
279         final OutboundQueueEntry entry = queue[offset];
280         if (entry.isCompleted()) {
281             LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
282             return null;
283         }
284
285         if (entry.isBarrier()) {
286             // This has been a barrier -- make sure we complete all preceding requests.
287             // XXX: Barriers are expected to complete in one message.
288             //      If this assumption is changed, this logic will need to be expanded
289             //      to ensure that the requests implied by the barrier are reported as
290             //      completed *after* the barrier.
291             LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
292             completeRequests(offset);
293             lastBarrierOffset = offset;
294
295             final boolean success = completeEntry(entry, response);
296             Verify.verify(success, "Barrier request failed to complete");
297             completeCount++;
298         } else if (completeEntry(entry, response)) {
299             completeCount++;
300         }
301
302         return entry;
303     }
304
305     private void completeRequests(final int toOffset) {
306         for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
307             final OutboundQueueEntry entry = queue[i];
308             if (!entry.isCompleted() && entry.complete(null)) {
309                 completeCount++;
310             }
311         }
312     }
313
314     void completeAll() {
315         completeRequests(queue.length);
316     }
317
318     int failAll(final OutboundQueueException cause) {
319         int ret = 0;
320         for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
321             final OutboundQueueEntry entry = queue[i];
322             if (!entry.isCommitted()) {
323                 break;
324             }
325
326             if (!entry.isCompleted()) {
327                 entry.fail(cause);
328                 ret++;
329             }
330         }
331
332         return ret;
333     }
334 }