3c3ceee3fba0c624915984f0ac1d0ef03942ca15
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / StackedOutboundQueue.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import com.google.common.util.concurrent.FutureCallback;
13 import io.netty.channel.Channel;
14 import java.util.ArrayList;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
18 import javax.annotation.Nonnull;
19 import javax.annotation.concurrent.GuardedBy;
20 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
21 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 final class StackedOutboundQueue implements OutboundQueue {
27     private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
28     private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
29     private static final AtomicLongFieldUpdater<StackedOutboundQueue> LAST_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "lastXid");
30
31     @GuardedBy("unflushedSegments")
32     private volatile StackedSegment firstSegment;
33     @GuardedBy("unflushedSegments")
34     private final List<StackedSegment> unflushedSegments = new ArrayList<>(2);
35     @GuardedBy("unflushedSegments")
36     private final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
37     private final OutboundQueueManager<?> manager;
38
39     private volatile long lastXid = -1;
40     private volatile long barrierXid = -1;
41
42     @GuardedBy("unflushedSegments")
43     private Integer shutdownOffset;
44
45     // Accessed from Netty only
46     private int flushOffset;
47
48     StackedOutboundQueue(final OutboundQueueManager<?> manager) {
49         this.manager = Preconditions.checkNotNull(manager);
50         firstSegment = StackedSegment.create(0L);
51         uncompletedSegments.add(firstSegment);
52         unflushedSegments.add(firstSegment);
53     }
54
55     @GuardedBy("unflushedSegments")
56     private void ensureSegment(final StackedSegment first, final int offset) {
57         final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE;
58         LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset, unflushedSegments.size());
59
60         for (int i = unflushedSegments.size(); i <= segmentOffset; ++i) {
61             final StackedSegment newSegment = StackedSegment.create(first.getBaseXid() + (StackedSegment.SEGMENT_SIZE * i));
62             LOG.debug("Adding segment {}", newSegment);
63             unflushedSegments.add(newSegment);
64         }
65     }
66
67     /*
68      * This method is expected to be called from multiple threads concurrently.
69      */
70     @Override
71     public Long reserveEntry() {
72         final long xid = LAST_XID_UPDATER.incrementAndGet(this);
73         final StackedSegment fastSegment = firstSegment;
74
75         if (xid >= fastSegment.getBaseXid() + StackedSegment.SEGMENT_SIZE) {
76             LOG.debug("Queue {} falling back to slow reservation for XID {}", this, xid);
77
78             // Multiple segments, this a slow path
79             synchronized (unflushedSegments) {
80                 LOG.debug("Queue {} executing slow reservation for XID {}", this, xid);
81
82                 // Shutdown was scheduled, need to fail the reservation
83                 if (shutdownOffset != null) {
84                     LOG.debug("Queue {} is being shutdown, failing reservation", this);
85                     return null;
86                 }
87
88                 // Ensure we have the appropriate segment for the specified XID
89                 final StackedSegment slowSegment = firstSegment;
90                 final int slowOffset = (int) (xid - slowSegment.getBaseXid());
91                 Verify.verify(slowOffset >= 0);
92
93                 // Now, we let's see if we need to allocate a new segment
94                 ensureSegment(slowSegment, slowOffset);
95
96                 LOG.debug("Queue {} slow reservation finished", this);
97             }
98         }
99
100         LOG.trace("Queue {} allocated XID {}", this, xid);
101         return xid;
102     }
103
104     /*
105      * This method is expected to be called from multiple threads concurrently
106      */
107     @Override
108     public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
109         final StackedSegment fastSegment = firstSegment;
110         final long calcOffset = xid - fastSegment.getBaseXid();
111         Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid());
112
113         Verify.verify(calcOffset <= Integer.MAX_VALUE);
114         final int fastOffset = (int) calcOffset;
115
116         final OutboundQueueEntry entry;
117         if (fastOffset >= StackedSegment.SEGMENT_SIZE) {
118             LOG.debug("Queue {} falling back to slow commit of XID {} at offset {}", this, xid, fastOffset);
119
120             final StackedSegment segment;
121             final int slowOffset;
122             synchronized (unflushedSegments) {
123                 final StackedSegment slowSegment = firstSegment;
124                 final long slowCalcOffset = xid - slowSegment.getBaseXid();
125                 Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE);
126                 slowOffset = (int) slowCalcOffset;
127
128                 LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset);
129                 segment = unflushedSegments.get(slowOffset / StackedSegment.SEGMENT_SIZE);
130             }
131
132             final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE;
133             entry = segment.getEntry(segOffset);
134             LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset);
135         } else {
136             entry = fastSegment.getEntry(fastOffset);
137         }
138
139         entry.commit(message, callback);
140         if (entry.isBarrier()) {
141             long my = xid;
142             for (;;) {
143                 final long prev = BARRIER_XID_UPDATER.getAndSet(this, my);
144                 if (prev < my) {
145                     LOG.debug("Queue {} recorded pending barrier XID {}", this, my);
146                     break;
147                 }
148
149                 // We have traveled back, recover
150                 LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
151                 my = prev;
152             }
153         }
154
155         LOG.trace("Queue {} committed XID {}", this, xid);
156         manager.ensureFlushing();
157     }
158
159     /**
160      * Write some entries from the queue to the channel. Guaranteed to run
161      * in the corresponding EventLoop.
162      *
163      * @param channel Channel onto which we are writing
164      * @param now
165      * @return Number of entries written out
166      */
167     int writeEntries(@Nonnull final Channel channel, final long now) {
168         // Local cache
169         StackedSegment segment = firstSegment;
170         int entries = 0;
171
172         while (channel.isWritable()) {
173             final OutboundQueueEntry entry = segment.getEntry(flushOffset);
174             if (!entry.isCommitted()) {
175                 LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid() + flushOffset, segment, flushOffset);
176                 break;
177             }
178
179             LOG.trace("Queue {} flushing entry at offset {}", this, flushOffset);
180             final OfHeader message = entry.takeMessage();
181             flushOffset++;
182             entries++;
183
184             if (message != null) {
185                 manager.writeMessage(message, now);
186             } else {
187                 entry.complete(null);
188             }
189
190             if (flushOffset >= StackedSegment.SEGMENT_SIZE) {
191                 /*
192                  * Slow path: purge the current segment unless it's the last one.
193                  * If it is, we leave it for replacement when a new reservation
194                  * is run on it.
195                  *
196                  * This costs us two slow paths, but hey, this should be very rare,
197                  * so let's keep things simple.
198                  */
199                 synchronized (unflushedSegments) {
200                     LOG.debug("Flush offset {} unflushed segments {}", flushOffset, unflushedSegments.size());
201
202                     // We may have raced ahead of reservation code and need to allocate a segment
203                     ensureSegment(segment, flushOffset);
204
205                     // Remove the segment, update the firstSegment and reset flushOffset
206                     final StackedSegment oldSegment = unflushedSegments.remove(0);
207                     if (oldSegment.isComplete()) {
208                         uncompletedSegments.remove(oldSegment);
209                         oldSegment.recycle();
210                     }
211
212                     // Reset the first segment and add it to the uncompleted list
213                     segment = unflushedSegments.get(0);
214                     uncompletedSegments.add(segment);
215
216                     // Update the shutdown offset
217                     if (shutdownOffset != null) {
218                         shutdownOffset -= StackedSegment.SEGMENT_SIZE;
219                     }
220
221                     // Allow reservations back on the fast path by publishing the new first segment
222                     firstSegment = segment;
223
224                     flushOffset = 0;
225                     LOG.debug("Queue {} flush moved to segment {}", this, segment);
226                 }
227             }
228         }
229
230         return entries;
231     }
232
233     Long reserveBarrierIfNeeded() {
234         final long bXid = barrierXid;
235         final long fXid = firstSegment.getBaseXid() + flushOffset;
236         if (bXid >= fXid) {
237             LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid);
238             return null;
239         } else {
240             return reserveEntry();
241         }
242     }
243
244     boolean pairRequest(final OfHeader message) {
245         Iterator<StackedSegment> it = uncompletedSegments.iterator();
246         while (it.hasNext()) {
247             final StackedSegment queue = it.next();
248             final OutboundQueueEntry entry = queue.pairRequest(message);
249             if (entry == null) {
250                 continue;
251             }
252
253             LOG.trace("Queue {} accepted response {}", queue, message);
254
255             // This has been a barrier request, we need to flush all
256             // previous queues
257             if (entry.isBarrier() && uncompletedSegments.size() > 1) {
258                 LOG.trace("Queue {} indicated request was a barrier", queue);
259
260                 it = uncompletedSegments.iterator();
261                 while (it.hasNext()) {
262                     final StackedSegment q = it.next();
263
264                     // We want to complete all queues before the current one, we will
265                     // complete the current queue below
266                     if (!queue.equals(q)) {
267                         LOG.trace("Queue {} is implied finished", q);
268                         q.completeAll();
269                         it.remove();
270                         q.recycle();
271                     } else {
272                         break;
273                     }
274                 }
275             }
276
277             if (queue.isComplete()) {
278                 LOG.trace("Queue {} is finished", queue);
279                 it.remove();
280                 queue.recycle();
281             }
282
283             return true;
284         }
285
286         LOG.debug("Failed to find completion for message {}", message);
287         return false;
288     }
289
290     long startShutdown(final Channel channel) {
291         /*
292          * We are dealing with a multi-threaded shutdown, as the user may still
293          * be reserving entries in the queue. We are executing in a netty thread,
294          * so neither flush nor barrier can be running, which is good news.
295          *
296          * We will eat up all the slots in the queue here and mark the offset first
297          * reserved offset and free up all the cached queues. We then schedule
298          * the flush task, which will deal with the rest of the shutdown process.
299          */
300         synchronized (unflushedSegments) {
301             // Increment the offset by the segment size, preventing fast path allocations,
302             // since we are holding the slow path lock, any reservations will see the queue
303             // in shutdown and fail accordingly.
304             final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
305             shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
306
307             return lockedShutdownFlush();
308         }
309     }
310
311     @GuardedBy("unflushedSegments")
312     private long lockedShutdownFlush() {
313         long entries = 0;
314
315         // Fail all queues
316         final Iterator<StackedSegment> it = uncompletedSegments.iterator();
317         while (it.hasNext()) {
318             final StackedSegment segment = it.next();
319
320             entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
321             if (segment.isComplete()) {
322                 LOG.trace("Cleared segment {}", segment);
323                 it.remove();
324             }
325         }
326
327         return entries;
328     }
329
330     boolean finishShutdown() {
331         synchronized (unflushedSegments) {
332             lockedShutdownFlush();
333         }
334
335         return !needsFlush();
336     }
337
338     boolean needsFlush() {
339         // flushOffset always points to the first entry, which can be changed only
340         // from Netty, so we are fine here.
341         if (firstSegment.getBaseXid() + flushOffset > lastXid) {
342             return false;
343         }
344
345         if (shutdownOffset != null && flushOffset >= shutdownOffset) {
346             return false;
347         }
348
349         return firstSegment.getEntry(flushOffset).isCommitted();
350     }
351 }