2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import com.google.common.util.concurrent.FutureCallback;
13 import io.netty.channel.Channel;
14 import java.util.ArrayList;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
18 import javax.annotation.Nonnull;
19 import javax.annotation.concurrent.GuardedBy;
20 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
21 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 final class StackedOutboundQueue implements OutboundQueue {
27 private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
28 private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
29 private static final AtomicLongFieldUpdater<StackedOutboundQueue> LAST_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "lastXid");
31 @GuardedBy("unflushedSegments")
32 private volatile StackedSegment firstSegment;
33 @GuardedBy("unflushedSegments")
34 private final List<StackedSegment> unflushedSegments = new ArrayList<>(2);
35 @GuardedBy("unflushedSegments")
36 private final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
37 private final OutboundQueueManager<?> manager;
39 private volatile long lastXid = -1;
40 private volatile long barrierXid = -1;
42 @GuardedBy("unflushedSegments")
43 private Integer shutdownOffset;
45 // Accessed from Netty only
46 private int flushOffset;
48 StackedOutboundQueue(final OutboundQueueManager<?> manager) {
49 this.manager = Preconditions.checkNotNull(manager);
50 firstSegment = StackedSegment.create(0L);
51 uncompletedSegments.add(firstSegment);
52 unflushedSegments.add(firstSegment);
55 @GuardedBy("unflushedSegments")
56 private void ensureSegment(final StackedSegment first, final int offset) {
57 final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE;
58 LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset, unflushedSegments.size());
60 for (int i = unflushedSegments.size(); i <= segmentOffset; ++i) {
61 final StackedSegment newSegment = StackedSegment.create(first.getBaseXid() + (StackedSegment.SEGMENT_SIZE * i));
62 LOG.debug("Adding segment {}", newSegment);
63 unflushedSegments.add(newSegment);
68 * This method is expected to be called from multiple threads concurrently.
71 public Long reserveEntry() {
72 final long xid = LAST_XID_UPDATER.incrementAndGet(this);
73 final StackedSegment fastSegment = firstSegment;
75 if (xid >= fastSegment.getBaseXid() + StackedSegment.SEGMENT_SIZE) {
76 LOG.debug("Queue {} falling back to slow reservation for XID {}", this, xid);
78 // Multiple segments, this a slow path
79 synchronized (unflushedSegments) {
80 LOG.debug("Queue {} executing slow reservation for XID {}", this, xid);
82 // Shutdown was scheduled, need to fail the reservation
83 if (shutdownOffset != null) {
84 LOG.debug("Queue {} is being shutdown, failing reservation", this);
88 // Ensure we have the appropriate segment for the specified XID
89 final StackedSegment slowSegment = firstSegment;
90 final int slowOffset = (int) (xid - slowSegment.getBaseXid());
91 Verify.verify(slowOffset >= 0);
93 // Now, we let's see if we need to allocate a new segment
94 ensureSegment(slowSegment, slowOffset);
96 LOG.debug("Queue {} slow reservation finished", this);
100 LOG.trace("Queue {} allocated XID {}", this, xid);
105 * This method is expected to be called from multiple threads concurrently
108 public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
109 final StackedSegment fastSegment = firstSegment;
110 final long calcOffset = xid - fastSegment.getBaseXid();
111 Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid());
113 Verify.verify(calcOffset <= Integer.MAX_VALUE);
114 final int fastOffset = (int) calcOffset;
116 final OutboundQueueEntry entry;
117 if (fastOffset >= StackedSegment.SEGMENT_SIZE) {
118 LOG.debug("Queue {} falling back to slow commit of XID {} at offset {}", this, xid, fastOffset);
120 final StackedSegment segment;
121 final int slowOffset;
122 synchronized (unflushedSegments) {
123 final StackedSegment slowSegment = firstSegment;
124 final long slowCalcOffset = xid - slowSegment.getBaseXid();
125 Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE);
126 slowOffset = (int) slowCalcOffset;
128 LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset);
129 segment = unflushedSegments.get(slowOffset / StackedSegment.SEGMENT_SIZE);
132 final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE;
133 entry = segment.getEntry(segOffset);
134 LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset);
136 entry = fastSegment.getEntry(fastOffset);
139 entry.commit(message, callback);
140 if (entry.isBarrier()) {
143 final long prev = BARRIER_XID_UPDATER.getAndSet(this, my);
145 LOG.debug("Queue {} recorded pending barrier XID {}", this, my);
149 // We have traveled back, recover
150 LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
155 LOG.trace("Queue {} committed XID {}", this, xid);
156 manager.ensureFlushing();
160 * Write some entries from the queue to the channel. Guaranteed to run
161 * in the corresponding EventLoop.
163 * @param channel Channel onto which we are writing
165 * @return Number of entries written out
167 int writeEntries(@Nonnull final Channel channel, final long now) {
169 StackedSegment segment = firstSegment;
172 while (channel.isWritable()) {
173 final OutboundQueueEntry entry = segment.getEntry(flushOffset);
174 if (!entry.isCommitted()) {
175 LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid() + flushOffset, segment, flushOffset);
179 LOG.trace("Queue {} flushing entry at offset {}", this, flushOffset);
180 final OfHeader message = entry.takeMessage();
184 if (message != null) {
185 manager.writeMessage(message, now);
187 entry.complete(null);
190 if (flushOffset >= StackedSegment.SEGMENT_SIZE) {
192 * Slow path: purge the current segment unless it's the last one.
193 * If it is, we leave it for replacement when a new reservation
196 * This costs us two slow paths, but hey, this should be very rare,
197 * so let's keep things simple.
199 synchronized (unflushedSegments) {
200 LOG.debug("Flush offset {} unflushed segments {}", flushOffset, unflushedSegments.size());
202 // We may have raced ahead of reservation code and need to allocate a segment
203 ensureSegment(segment, flushOffset);
205 // Remove the segment, update the firstSegment and reset flushOffset
206 final StackedSegment oldSegment = unflushedSegments.remove(0);
207 if (oldSegment.isComplete()) {
208 uncompletedSegments.remove(oldSegment);
209 oldSegment.recycle();
212 // Reset the first segment and add it to the uncompleted list
213 segment = unflushedSegments.get(0);
214 uncompletedSegments.add(segment);
216 // Update the shutdown offset
217 if (shutdownOffset != null) {
218 shutdownOffset -= StackedSegment.SEGMENT_SIZE;
221 // Allow reservations back on the fast path by publishing the new first segment
222 firstSegment = segment;
225 LOG.debug("Queue {} flush moved to segment {}", this, segment);
233 Long reserveBarrierIfNeeded() {
234 final long bXid = barrierXid;
235 final long fXid = firstSegment.getBaseXid() + flushOffset;
237 LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid);
240 return reserveEntry();
244 boolean pairRequest(final OfHeader message) {
245 Iterator<StackedSegment> it = uncompletedSegments.iterator();
246 while (it.hasNext()) {
247 final StackedSegment queue = it.next();
248 final OutboundQueueEntry entry = queue.pairRequest(message);
253 LOG.trace("Queue {} accepted response {}", queue, message);
255 // This has been a barrier request, we need to flush all
257 if (entry.isBarrier() && uncompletedSegments.size() > 1) {
258 LOG.trace("Queue {} indicated request was a barrier", queue);
260 it = uncompletedSegments.iterator();
261 while (it.hasNext()) {
262 final StackedSegment q = it.next();
264 // We want to complete all queues before the current one, we will
265 // complete the current queue below
266 if (!queue.equals(q)) {
267 LOG.trace("Queue {} is implied finished", q);
277 if (queue.isComplete()) {
278 LOG.trace("Queue {} is finished", queue);
286 LOG.debug("Failed to find completion for message {}", message);
290 long startShutdown(final Channel channel) {
292 * We are dealing with a multi-threaded shutdown, as the user may still
293 * be reserving entries in the queue. We are executing in a netty thread,
294 * so neither flush nor barrier can be running, which is good news.
296 * We will eat up all the slots in the queue here and mark the offset first
297 * reserved offset and free up all the cached queues. We then schedule
298 * the flush task, which will deal with the rest of the shutdown process.
300 synchronized (unflushedSegments) {
301 // Increment the offset by the segment size, preventing fast path allocations,
302 // since we are holding the slow path lock, any reservations will see the queue
303 // in shutdown and fail accordingly.
304 final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
305 shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
307 return lockedShutdownFlush();
311 @GuardedBy("unflushedSegments")
312 private long lockedShutdownFlush() {
316 final Iterator<StackedSegment> it = uncompletedSegments.iterator();
317 while (it.hasNext()) {
318 final StackedSegment segment = it.next();
320 entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
321 if (segment.isComplete()) {
322 LOG.trace("Cleared segment {}", segment);
330 boolean finishShutdown() {
331 synchronized (unflushedSegments) {
332 lockedShutdownFlush();
335 return !needsFlush();
338 boolean needsFlush() {
339 // flushOffset always points to the first entry, which can be changed only
340 // from Netty, so we are fine here.
341 if (firstSegment.getBaseXid() + flushOffset > lastXid) {
345 if (shutdownOffset != null && flushOffset >= shutdownOffset) {
349 return firstSegment.getEntry(flushOffset).isCommitted();