Optimize barrier-implied flushes
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueImpl.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.FutureCallback;
12 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
13 import java.util.concurrent.locks.LockSupport;
14 import javax.annotation.Nonnull;
15 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 final class OutboundQueueImpl implements OutboundQueue {
21     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
22     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
23             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
24     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
25             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
26     private static final long FLUSH_RETRY_NANOS = 1L;
27     private final OutboundQueueManager<?> manager;
28     private final OutboundQueueEntry[] queue;
29     private final long baseXid;
30     private final long endXid;
31     private final int reserve;
32
33     // Updated concurrently
34     private volatile int barrierOffset = -1;
35     private volatile int reserveOffset = 0;
36
37     // Updated from Netty only
38     private int flushOffset;
39     private int completeCount;
40     private int lastBarrierOffset = -1;
41
42     OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
43         /*
44          * We use the last entry as an emergency should a timeout-triggered
45          * flush request race with normal users for the last entry in this
46          * queue. In that case the flush request will take the last entry and
47          * schedule a flush, which means that we will get around sending the
48          * message as soon as the user finishes the reservation.
49          */
50         Preconditions.checkArgument(maxQueue > 1);
51         this.baseXid = baseXid;
52         this.endXid = baseXid + maxQueue;
53         this.reserve = maxQueue - 1;
54         this.manager = Preconditions.checkNotNull(manager);
55         queue = new OutboundQueueEntry[maxQueue];
56         for (int i = 0; i < maxQueue; ++i) {
57             queue[i] = new OutboundQueueEntry();
58         }
59     }
60
61     private OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final OutboundQueueEntry[] queue) {
62         this.manager = Preconditions.checkNotNull(manager);
63         this.queue = Preconditions.checkNotNull(queue);
64         this.baseXid = baseXid;
65         this.endXid = baseXid + queue.length;
66         this.reserve = queue.length - 1;
67         for (OutboundQueueEntry element : queue) {
68             element.reset();
69         }
70     }
71
72     OutboundQueueImpl reuse(final long baseXid) {
73         return new OutboundQueueImpl(manager, baseXid, queue);
74     }
75
76     @Override
77     public Long reserveEntry() {
78         return reserveEntry(false);
79     }
80
81     @Override
82     public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
83         final int offset = (int)(xid - baseXid);
84         if (message != null) {
85             Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
86         }
87
88         final OutboundQueueEntry entry = queue[offset];
89         entry.commit(message, callback);
90         LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
91
92         if (entry.isBarrier()) {
93             int my = offset;
94             for (;;) {
95                 final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
96                 if (prev < my) {
97                     LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
98                     break;
99                 }
100
101                 // We have traveled back, recover
102                 my = prev;
103             }
104         }
105
106         manager.ensureFlushing(this);
107     }
108
109     private Long reserveEntry(final boolean forBarrier) {
110         final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
111         if (offset >= reserve) {
112             if (forBarrier) {
113                 LOG.debug("Queue {} offset {}/{}, using emergency slot", this, offset, queue.length);
114                 return endXid;
115             } else {
116                 LOG.debug("Queue {} offset {}/{}, not allowing reservation", this, offset, queue.length);
117                 return null;
118             }
119         }
120
121         final Long xid = baseXid + offset;
122         LOG.debug("Queue {} allocated XID {} at offset {}", this, xid, offset);
123         return xid;
124     }
125
126     Long reserveBarrierIfNeeded() {
127         final int bo = barrierOffset;
128         if (bo >= flushOffset) {
129             LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
130             return null;
131         } else {
132             return reserveEntry(true);
133         }
134     }
135
136     /**
137      * An empty queue is a queue which has no further unflushed entries.
138      *
139      * @return True if this queue does not have unprocessed entries.
140      */
141     boolean isEmpty() {
142         int ro = reserveOffset;
143         if (ro >= reserve) {
144             if (queue[reserve].isCommitted()) {
145                 ro = reserve + 1;
146             } else {
147                 ro = reserve;
148             }
149         }
150
151         LOG.debug("Effective flush/reserve offset {}/{}", flushOffset, ro);
152         return ro <= flushOffset;
153     }
154
155     /**
156      * A queue is finished when all of its entries have been completed.
157      *
158      * @return False if there are any uncompleted requests.
159      */
160     boolean isFinished() {
161         if (completeCount < reserve) {
162             return false;
163         }
164
165         // We need to check if the last entry was used
166         final OutboundQueueEntry last = queue[reserve];
167         return !last.isCommitted() || last.isCompleted();
168     }
169
170     boolean isFlushed() {
171         LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
172         if (flushOffset < reserve) {
173             return false;
174         }
175
176         // flushOffset implied == reserve
177         return flushOffset >= queue.length || !queue[reserve].isCommitted();
178     }
179
180     OfHeader flushEntry() {
181         for (;;) {
182             // No message ready
183             if (isEmpty()) {
184                 LOG.debug("Flush offset {} is uptodate with reserved", flushOffset);
185                 return null;
186             }
187
188             boolean retry = true;
189             while (!queue[flushOffset].isCommitted()) {
190                 if (!retry) {
191                     LOG.debug("Offset {} not ready yet, giving up", flushOffset);
192                     return null;
193                 }
194
195                 LOG.debug("Offset {} not ready yet, retrying", flushOffset);
196                 LockSupport.parkNanos(FLUSH_RETRY_NANOS);
197                 retry = false;
198             }
199
200             final OfHeader msg = queue[flushOffset++].getMessage();
201             if (msg != null) {
202                 return msg;
203             }
204         }
205     }
206
207     private boolean xidInRance(final long xid) {
208         return xid < endXid && (xid >= baseXid || baseXid > endXid);
209     }
210
211     /**
212      * Return the request entry corresponding to a response. Returns null
213      * if there is no request matching the response.
214      *
215      * @param response Response message
216      * @return Matching request entry, or null if no match is found.
217      */
218     OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
219         final Long xid = response.getXid();
220         if (!xidInRance(xid)) {
221             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
222             return null;
223         }
224
225         final int offset = (int)(xid - baseXid);
226         final OutboundQueueEntry entry = queue[offset];
227         if (entry.isCompleted()) {
228             LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
229             return null;
230         }
231
232         if (entry.complete(response)) {
233             completeCount++;
234
235             // This has been a barrier -- make sure we complete all preceding requests
236             if (entry.isBarrier()) {
237                 LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
238                 completeRequests(offset);
239                 lastBarrierOffset = offset;
240             }
241         }
242         return entry;
243     }
244
245     private void completeRequests(final int toOffset) {
246         for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
247             final OutboundQueueEntry entry = queue[i];
248             if (!entry.isCompleted() && entry.complete(null)) {
249                 completeCount++;
250             }
251         }
252     }
253
254     void completeAll() {
255         completeRequests(queue.length);
256     }
257
258     int failAll(final Throwable cause) {
259         int ret = 0;
260         for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
261             final OutboundQueueEntry entry = queue[i];
262             if (!entry.isCompleted()) {
263                 entry.fail(cause);
264                 ret++;
265             }
266         }
267
268         return ret;
269     }
270 }