Barrier turn on/off - Split OutboundQueueManager
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / StackedOutboundQueue.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import com.google.common.util.concurrent.FutureCallback;
13 import io.netty.channel.Channel;
14 import java.util.ArrayList;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
18 import javax.annotation.Nonnull;
19 import javax.annotation.concurrent.GuardedBy;
20 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
21 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 final class StackedOutboundQueue implements OutboundQueue {
27     private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
28     private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
29     private static final AtomicLongFieldUpdater<StackedOutboundQueue> LAST_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "lastXid");
30
31     @GuardedBy("unflushedSegments")
32     private volatile StackedSegment firstSegment;
33     @GuardedBy("unflushedSegments")
34     private final List<StackedSegment> unflushedSegments = new ArrayList<>(2);
35     @GuardedBy("unflushedSegments")
36     private final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
37     private final AbstractOutboundQueueManager<?> manager;
38
39     private volatile long allocatedXid = -1;
40     private volatile long barrierXid = -1;
41     private volatile long lastXid = -1;
42
43     @GuardedBy("unflushedSegments")
44     private Integer shutdownOffset;
45
46     // Accessed from Netty only
47     private int flushOffset;
48
49     StackedOutboundQueue(final AbstractOutboundQueueManager<?> manager) {
50         this.manager = Preconditions.checkNotNull(manager);
51         firstSegment = StackedSegment.create(0L);
52         uncompletedSegments.add(firstSegment);
53         unflushedSegments.add(firstSegment);
54     }
55
56     @GuardedBy("unflushedSegments")
57     private void ensureSegment(final StackedSegment first, final int offset) {
58         final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE;
59         LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset, unflushedSegments.size());
60
61         for (int i = unflushedSegments.size(); i <= segmentOffset; ++i) {
62             final StackedSegment newSegment = StackedSegment.create(first.getBaseXid() + (StackedSegment.SEGMENT_SIZE * i));
63             LOG.debug("Adding segment {}", newSegment);
64             unflushedSegments.add(newSegment);
65         }
66
67         allocatedXid = uncompletedSegments.get(uncompletedSegments.size() - 1).getEndXid();
68     }
69
70     /*
71      * This method is expected to be called from multiple threads concurrently.
72      */
73     @Override
74     public Long reserveEntry() {
75         final long xid = LAST_XID_UPDATER.incrementAndGet(this);
76         final StackedSegment fastSegment = firstSegment;
77
78         if (xid >= fastSegment.getBaseXid() + StackedSegment.SEGMENT_SIZE) {
79            if (xid >= allocatedXid) {
80                 // Multiple segments, this a slow path
81                 LOG.debug("Queue {} falling back to slow reservation for XID {}", this, xid);
82
83                 synchronized (unflushedSegments) {
84                     LOG.debug("Queue {} executing slow reservation for XID {}", this, xid);
85
86                     // Shutdown was scheduled, need to fail the reservation
87                     if (shutdownOffset != null) {
88                         LOG.debug("Queue {} is being shutdown, failing reservation", this);
89                         return null;
90                     }
91
92                     // Ensure we have the appropriate segment for the specified XID
93                     final StackedSegment slowSegment = firstSegment;
94                     final int slowOffset = (int) (xid - slowSegment.getBaseXid());
95                     Verify.verify(slowOffset >= 0);
96
97                     // Now, we let's see if we need to allocate a new segment
98                     ensureSegment(slowSegment, slowOffset);
99
100                     LOG.debug("Queue {} slow reservation finished", this);
101                 }
102             } else {
103                 LOG.debug("Queue {} XID {} is already backed", this, xid);
104             }
105         }
106
107         LOG.trace("Queue {} allocated XID {}", this, xid);
108         return xid;
109     }
110
111     /*
112      * This method is expected to be called from multiple threads concurrently
113      */
114     @Override
115     public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
116         final StackedSegment fastSegment = firstSegment;
117         final long calcOffset = xid - fastSegment.getBaseXid();
118         Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid());
119
120         Verify.verify(calcOffset <= Integer.MAX_VALUE);
121         final int fastOffset = (int) calcOffset;
122
123         final OutboundQueueEntry entry;
124         if (fastOffset >= StackedSegment.SEGMENT_SIZE) {
125             LOG.debug("Queue {} falling back to slow commit of XID {} at offset {}", this, xid, fastOffset);
126
127             final StackedSegment segment;
128             final int slowOffset;
129             synchronized (unflushedSegments) {
130                 final StackedSegment slowSegment = firstSegment;
131                 final long slowCalcOffset = xid - slowSegment.getBaseXid();
132                 Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE);
133                 slowOffset = (int) slowCalcOffset;
134
135                 LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset);
136                 segment = unflushedSegments.get(slowOffset / StackedSegment.SEGMENT_SIZE);
137             }
138
139             final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE;
140             entry = segment.getEntry(segOffset);
141             LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset);
142         } else {
143             entry = fastSegment.getEntry(fastOffset);
144         }
145
146         entry.commit(message, callback);
147         if (entry.isBarrier()) {
148             long my = xid;
149             for (;;) {
150                 final long prev = BARRIER_XID_UPDATER.getAndSet(this, my);
151                 if (prev < my) {
152                     LOG.debug("Queue {} recorded pending barrier XID {}", this, my);
153                     break;
154                 }
155
156                 // We have traveled back, recover
157                 LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
158                 my = prev;
159             }
160         }
161
162         LOG.trace("Queue {} committed XID {}", this, xid);
163         manager.ensureFlushing();
164     }
165
166     /**
167      * Write some entries from the queue to the channel. Guaranteed to run
168      * in the corresponding EventLoop.
169      *
170      * @param channel Channel onto which we are writing
171      * @param now
172      * @return Number of entries written out
173      */
174     int writeEntries(@Nonnull final Channel channel, final long now) {
175         // Local cache
176         StackedSegment segment = firstSegment;
177         int entries = 0;
178
179         while (channel.isWritable()) {
180             final OutboundQueueEntry entry = segment.getEntry(flushOffset);
181             if (!entry.isCommitted()) {
182                 LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid() + flushOffset, segment, flushOffset);
183                 break;
184             }
185
186             LOG.trace("Queue {} flushing entry at offset {}", this, flushOffset);
187             final OfHeader message = entry.takeMessage();
188             flushOffset++;
189             entries++;
190
191             if (message != null) {
192                 manager.writeMessage(message, now);
193             } else {
194                 entry.complete(null);
195             }
196
197             if (flushOffset >= StackedSegment.SEGMENT_SIZE) {
198                 /*
199                  * Slow path: purge the current segment unless it's the last one.
200                  * If it is, we leave it for replacement when a new reservation
201                  * is run on it.
202                  *
203                  * This costs us two slow paths, but hey, this should be very rare,
204                  * so let's keep things simple.
205                  */
206                 synchronized (unflushedSegments) {
207                     LOG.debug("Flush offset {} unflushed segments {}", flushOffset, unflushedSegments.size());
208
209                     // We may have raced ahead of reservation code and need to allocate a segment
210                     ensureSegment(segment, flushOffset);
211
212                     // Remove the segment, update the firstSegment and reset flushOffset
213                     final StackedSegment oldSegment = unflushedSegments.remove(0);
214                     if (oldSegment.isComplete()) {
215                         uncompletedSegments.remove(oldSegment);
216                         oldSegment.recycle();
217                     }
218
219                     // Reset the first segment and add it to the uncompleted list
220                     segment = unflushedSegments.get(0);
221                     uncompletedSegments.add(segment);
222
223                     // Update the shutdown offset
224                     if (shutdownOffset != null) {
225                         shutdownOffset -= StackedSegment.SEGMENT_SIZE;
226                     }
227
228                     // Allow reservations back on the fast path by publishing the new first segment
229                     firstSegment = segment;
230
231                     flushOffset = 0;
232                     LOG.debug("Queue {} flush moved to segment {}", this, segment);
233                 }
234             }
235         }
236
237         return entries;
238     }
239
240     Long reserveBarrierIfNeeded() {
241         final long bXid = barrierXid;
242         final long fXid = firstSegment.getBaseXid() + flushOffset;
243         if (bXid >= fXid) {
244             LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid);
245             return null;
246         }
247         return reserveEntry();
248     }
249
250     boolean pairRequest(final OfHeader message) {
251         Iterator<StackedSegment> it = uncompletedSegments.iterator();
252         while (it.hasNext()) {
253             final StackedSegment queue = it.next();
254             final OutboundQueueEntry entry = queue.pairRequest(message);
255             if (entry == null) {
256                 continue;
257             }
258
259             LOG.trace("Queue {} accepted response {}", queue, message);
260
261             // This has been a barrier request, we need to flush all
262             // previous queues
263             if (entry.isBarrier() && uncompletedSegments.size() > 1) {
264                 LOG.trace("Queue {} indicated request was a barrier", queue);
265
266                 it = uncompletedSegments.iterator();
267                 while (it.hasNext()) {
268                     final StackedSegment q = it.next();
269
270                     // We want to complete all queues before the current one, we will
271                     // complete the current queue below
272                     if (!queue.equals(q)) {
273                         LOG.trace("Queue {} is implied finished", q);
274                         q.completeAll();
275                         it.remove();
276                         q.recycle();
277                     } else {
278                         break;
279                     }
280                 }
281             }
282
283             if (queue.isComplete()) {
284                 LOG.trace("Queue {} is finished", queue);
285                 it.remove();
286                 queue.recycle();
287             }
288
289             return true;
290         }
291
292         LOG.debug("Failed to find completion for message {}", message);
293         return false;
294     }
295
296     long startShutdown(final Channel channel) {
297         /*
298          * We are dealing with a multi-threaded shutdown, as the user may still
299          * be reserving entries in the queue. We are executing in a netty thread,
300          * so neither flush nor barrier can be running, which is good news.
301          *
302          * We will eat up all the slots in the queue here and mark the offset first
303          * reserved offset and free up all the cached queues. We then schedule
304          * the flush task, which will deal with the rest of the shutdown process.
305          */
306         synchronized (unflushedSegments) {
307             // Increment the offset by the segment size, preventing fast path allocations,
308             // since we are holding the slow path lock, any reservations will see the queue
309             // in shutdown and fail accordingly.
310             final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
311             shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
312
313             return lockedShutdownFlush();
314         }
315     }
316
317     @GuardedBy("unflushedSegments")
318     private long lockedShutdownFlush() {
319         long entries = 0;
320
321         // Fail all queues
322         final Iterator<StackedSegment> it = uncompletedSegments.iterator();
323         while (it.hasNext()) {
324             final StackedSegment segment = it.next();
325
326             entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
327             if (segment.isComplete()) {
328                 LOG.trace("Cleared segment {}", segment);
329                 it.remove();
330             }
331         }
332
333         return entries;
334     }
335
336     boolean finishShutdown() {
337         synchronized (unflushedSegments) {
338             lockedShutdownFlush();
339         }
340
341         return !needsFlush();
342     }
343
344     boolean needsFlush() {
345         // flushOffset always points to the first entry, which can be changed only
346         // from Netty, so we are fine here.
347         if (firstSegment.getBaseXid() + flushOffset > lastXid) {
348             return false;
349         }
350
351         if (shutdownOffset != null && flushOffset >= shutdownOffset) {
352             return false;
353         }
354
355         return firstSegment.getEntry(flushOffset).isCommitted();
356     }
357 }