BUG-3219: do to park thread in flush
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueImpl.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.FutureCallback;
12 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
13 import javax.annotation.Nonnull;
14 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 final class OutboundQueueImpl implements OutboundQueue {
20     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
21     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
22             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
23     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
24             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
25     private final OutboundQueueManager<?> manager;
26     private final OutboundQueueEntry[] queue;
27     private final long baseXid;
28     private final long endXid;
29     private final int reserve;
30
31     // Updated concurrently
32     private volatile int barrierOffset = -1;
33     private volatile int reserveOffset = 0;
34
35     // Updated from Netty only
36     private int flushOffset;
37     private int completeCount;
38     private int lastBarrierOffset = -1;
39
40     OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
41         /*
42          * We use the last entry as an emergency should a timeout-triggered
43          * flush request race with normal users for the last entry in this
44          * queue. In that case the flush request will take the last entry and
45          * schedule a flush, which means that we will get around sending the
46          * message as soon as the user finishes the reservation.
47          */
48         Preconditions.checkArgument(maxQueue > 1);
49         this.baseXid = baseXid;
50         this.endXid = baseXid + maxQueue;
51         this.reserve = maxQueue - 1;
52         this.manager = Preconditions.checkNotNull(manager);
53         queue = new OutboundQueueEntry[maxQueue];
54         for (int i = 0; i < maxQueue; ++i) {
55             queue[i] = new OutboundQueueEntry();
56         }
57     }
58
59     private OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final OutboundQueueEntry[] queue) {
60         this.manager = Preconditions.checkNotNull(manager);
61         this.queue = Preconditions.checkNotNull(queue);
62         this.baseXid = baseXid;
63         this.endXid = baseXid + queue.length;
64         this.reserve = queue.length - 1;
65         for (OutboundQueueEntry element : queue) {
66             element.reset();
67         }
68     }
69
70     OutboundQueueImpl reuse(final long baseXid) {
71         return new OutboundQueueImpl(manager, baseXid, queue);
72     }
73
74     @Override
75     public Long reserveEntry() {
76         return reserveEntry(false);
77     }
78
79     @Override
80     public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
81         final int offset = (int)(xid - baseXid);
82         if (message != null) {
83             Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
84         }
85
86         final OutboundQueueEntry entry = queue[offset];
87         entry.commit(message, callback);
88         LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
89
90         if (entry.isBarrier()) {
91             int my = offset;
92             for (;;) {
93                 final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
94                 if (prev < my) {
95                     LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
96                     break;
97                 }
98
99                 // We have traveled back, recover
100                 my = prev;
101             }
102         }
103
104         manager.ensureFlushing(this);
105     }
106
107     private Long reserveEntry(final boolean forBarrier) {
108         final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
109         if (offset >= reserve) {
110             if (forBarrier) {
111                 LOG.debug("Queue {} offset {}/{}, using emergency slot", this, offset, queue.length);
112                 return endXid;
113             } else {
114                 LOG.debug("Queue {} offset {}/{}, not allowing reservation", this, offset, queue.length);
115                 return null;
116             }
117         }
118
119         final Long xid = baseXid + offset;
120         LOG.debug("Queue {} allocated XID {} at offset {}", this, xid, offset);
121         return xid;
122     }
123
124     Long reserveBarrierIfNeeded() {
125         final int bo = barrierOffset;
126         if (bo >= flushOffset) {
127             LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
128             return null;
129         } else {
130             return reserveEntry(true);
131         }
132     }
133
134     /**
135      * An empty queue is a queue which has no further unflushed entries.
136      *
137      * @return True if this queue does not have unprocessed entries.
138      */
139     boolean isEmpty() {
140         int ro = reserveOffset;
141         if (ro >= reserve) {
142             if (queue[reserve].isCommitted()) {
143                 ro = reserve + 1;
144             } else {
145                 ro = reserve;
146             }
147         }
148
149         LOG.debug("Effective flush/reserve offset {}/{}", flushOffset, ro);
150         return ro <= flushOffset;
151     }
152
153     /**
154      * A queue is finished when all of its entries have been completed.
155      *
156      * @return False if there are any uncompleted requests.
157      */
158     boolean isFinished() {
159         if (completeCount < reserve) {
160             return false;
161         }
162
163         // We need to check if the last entry was used
164         final OutboundQueueEntry last = queue[reserve];
165         return !last.isCommitted() || last.isCompleted();
166     }
167
168     boolean isFlushed() {
169         LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
170         if (flushOffset < reserve) {
171             return false;
172         }
173
174         // flushOffset implied == reserve
175         return flushOffset >= queue.length || !queue[reserve].isCommitted();
176     }
177
178     OfHeader flushEntry() {
179         for (;;) {
180             // No message ready
181             if (isEmpty()) {
182                 LOG.trace("Flushed all reserved entries up to ", flushOffset);
183                 return null;
184             }
185
186             final OutboundQueueEntry entry = queue[flushOffset];
187             if (!entry.isCommitted()) {
188                 LOG.trace("Request at offset {} not ready yet, giving up", flushOffset);
189                 return null;
190             }
191
192             final OfHeader msg = entry.getMessage();
193             flushOffset++;
194             if (msg != null) {
195                 return msg;
196             }
197
198             LOG.trace("Null message, skipping to offset {}", flushOffset);
199         }
200     }
201
202     private boolean xidInRance(final long xid) {
203         return xid < endXid && (xid >= baseXid || baseXid > endXid);
204     }
205
206     /**
207      * Return the request entry corresponding to a response. Returns null
208      * if there is no request matching the response.
209      *
210      * @param response Response message
211      * @return Matching request entry, or null if no match is found.
212      */
213     OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
214         final Long xid = response.getXid();
215         if (!xidInRance(xid)) {
216             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
217             return null;
218         }
219
220         final int offset = (int)(xid - baseXid);
221         final OutboundQueueEntry entry = queue[offset];
222         if (entry.isCompleted()) {
223             LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
224             return null;
225         }
226
227         if (entry.complete(response)) {
228             completeCount++;
229
230             // This has been a barrier -- make sure we complete all preceding requests
231             if (entry.isBarrier()) {
232                 LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
233                 completeRequests(offset);
234                 lastBarrierOffset = offset;
235             }
236         }
237         return entry;
238     }
239
240     private void completeRequests(final int toOffset) {
241         for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
242             final OutboundQueueEntry entry = queue[i];
243             if (!entry.isCompleted() && entry.complete(null)) {
244                 completeCount++;
245             }
246         }
247     }
248
249     void completeAll() {
250         completeRequests(queue.length);
251     }
252
253     int failAll(final Throwable cause) {
254         int ret = 0;
255         for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
256             final OutboundQueueEntry entry = queue[i];
257             if (!entry.isCompleted()) {
258                 entry.fail(cause);
259                 ret++;
260             }
261         }
262
263         return ret;
264     }
265 }