2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import com.google.common.util.concurrent.FutureCallback;
13 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
14 import javax.annotation.Nonnull;
15 import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
16 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
17 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
23 final class OutboundQueueImpl implements OutboundQueue {
24 private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
25 private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
26 AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
27 private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
28 AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
29 private final OutboundQueueManager<?> manager;
30 private final OutboundQueueEntry[] queue;
31 private final long baseXid;
32 private final long endXid;
33 private final int reserve;
35 // Updated concurrently
36 private volatile int barrierOffset = -1;
37 private volatile int reserveOffset = 0;
39 // Updated from Netty only
40 private int flushOffset;
41 private int completeCount;
42 private int lastBarrierOffset = -1;
44 OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
46 * We use the last entry as an emergency should a timeout-triggered
47 * flush request race with normal users for the last entry in this
48 * queue. In that case the flush request will take the last entry and
49 * schedule a flush, which means that we will get around sending the
50 * message as soon as the user finishes the reservation.
52 Preconditions.checkArgument(maxQueue > 1);
53 this.baseXid = baseXid;
54 this.endXid = baseXid + maxQueue;
55 this.reserve = maxQueue - 1;
56 this.manager = Preconditions.checkNotNull(manager);
57 queue = new OutboundQueueEntry[maxQueue];
58 for (int i = 0; i < maxQueue; ++i) {
59 queue[i] = new OutboundQueueEntry();
63 private OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final OutboundQueueEntry[] queue) {
64 this.manager = Preconditions.checkNotNull(manager);
65 this.queue = Preconditions.checkNotNull(queue);
66 this.baseXid = baseXid;
67 this.endXid = baseXid + queue.length;
68 this.reserve = queue.length - 1;
72 for (OutboundQueueEntry element : queue) {
77 OutboundQueueImpl reuse(final OutboundQueueManager<?> manager, final long baseXid) {
78 return new OutboundQueueImpl(manager, baseXid, queue);
82 public Long reserveEntry() {
83 return reserveEntry(false);
87 public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
88 final int offset = (int)(xid - baseXid);
89 if (message != null) {
90 Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
93 final int ro = reserveOffset;
94 Preconditions.checkArgument(offset < ro, "Unexpected commit to offset %s reserved %s message %s", offset, ro, message);
96 final OutboundQueueEntry entry = queue[offset];
97 entry.commit(message, callback);
98 LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, ro);
100 if (entry.isBarrier()) {
103 final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
105 LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
109 // We have traveled back, recover
110 LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
115 manager.ensureFlushing(this);
118 private Long reserveEntry(final boolean forBarrier) {
119 final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
120 if (offset >= reserve) {
122 LOG.debug("Queue {} offset {}/{}, using emergency slot", this, offset, queue.length);
125 LOG.debug("Queue {} offset {}/{}, not allowing reservation", this, offset, queue.length);
130 final Long xid = baseXid + offset;
131 LOG.debug("Queue {} allocated XID {} at offset {}", this, xid, offset);
135 Long reserveBarrierIfNeeded() {
136 final int bo = barrierOffset;
137 if (bo >= flushOffset) {
138 LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
141 return reserveEntry(true);
145 int startShutdown() {
146 // Increment the offset by the queue size, hence preventing any normal
147 // allocations. We should not be seeing a barrier reservation after this
148 // and if there is one issued, we can disregard it.
149 final int offset = CURRENT_OFFSET_UPDATER.getAndAdd(this, queue.length);
151 // If this offset is larger than reserve, trim it. That is not an accurate
152 // view of which slot was actually "reserved", but it indicates at which
153 // entry we can declare the queue flushed (e.g. at the emergency slot).
154 return offset > reserve ? reserve : offset;
157 boolean isShutdown(final int offset) {
158 // This queue is shutdown if the flushOffset (e.g. the next entry to
159 // be flushed) points to the offset 'reserved' in startShutdown()
160 return flushOffset >= offset;
164 * An empty queue is a queue which has no further unflushed entries.
166 * @return True if this queue does not have unprocessed entries.
168 private boolean isEmpty() {
169 int ro = reserveOffset;
171 if (queue[reserve].isCommitted()) {
178 LOG.debug("Effective flush/reserve offset {}/{}", flushOffset, ro);
179 return ro <= flushOffset;
183 * A queue is finished when all of its entries have been completed.
185 * @return False if there are any uncompleted requests.
187 boolean isFinished() {
188 if (completeCount < reserve) {
192 // We need to check if the last entry was used
193 final OutboundQueueEntry last = queue[reserve];
194 return !last.isCommitted() || last.isCompleted();
197 boolean isFlushed() {
198 LOG.debug("Check queue {} for completeness (offset {}, reserve {})", this, flushOffset, reserve);
199 if (flushOffset < reserve) {
203 // flushOffset implied == reserve
204 return flushOffset >= queue.length || !queue[reserve].isCommitted();
207 boolean needsFlush() {
208 if (flushOffset < reserve) {
209 return queue[flushOffset].isCommitted();
213 LOG.trace("Queue {} is flushed, schedule a replace", this);
217 LOG.trace("Queue {} is finished, schedule a cleanup", this);
224 OfHeader flushEntry() {
228 LOG.trace("Flushed all reserved entries up to {}", flushOffset);
232 final OutboundQueueEntry entry = queue[flushOffset];
233 if (!entry.isCommitted()) {
234 LOG.trace("Request at offset {} not ready yet, giving up", flushOffset);
238 final OfHeader msg = entry.takeMessage();
244 LOG.trace("Null message, skipping to offset {}", flushOffset);
248 // Argument is 'long' to explicitly convert before performing operations
249 private boolean xidInRange(final long xid) {
250 return xid < endXid && (xid >= baseXid || baseXid > endXid);
253 private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
254 if (response instanceof Error) {
255 final Error err = (Error)response;
256 LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
257 entry.fail(new DeviceRequestFailedException("Device-side failure", err));
260 return entry.complete(response);
265 * Return the request entry corresponding to a response. Returns null
266 * if there is no request matching the response.
268 * @param response Response message
269 * @return Matching request entry, or null if no match is found.
271 OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
272 final Long xid = response.getXid();
273 if (!xidInRange(xid)) {
274 LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
278 final int offset = (int)(xid - baseXid);
279 final OutboundQueueEntry entry = queue[offset];
280 if (entry.isCompleted()) {
281 LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
285 if (entry.isBarrier()) {
286 // This has been a barrier -- make sure we complete all preceding requests.
287 // XXX: Barriers are expected to complete in one message.
288 // If this assumption is changed, this logic will need to be expanded
289 // to ensure that the requests implied by the barrier are reported as
290 // completed *after* the barrier.
291 LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
292 completeRequests(offset);
293 lastBarrierOffset = offset;
295 final boolean success = completeEntry(entry, response);
296 Verify.verify(success, "Barrier request failed to complete");
298 } else if (completeEntry(entry, response)) {
305 private void completeRequests(final int toOffset) {
306 for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
307 final OutboundQueueEntry entry = queue[i];
308 if (!entry.isCompleted() && entry.complete(null)) {
315 completeRequests(queue.length);
318 int failAll(final OutboundQueueException cause) {
320 for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
321 final OutboundQueueEntry entry = queue[i];
322 if (!entry.isCommitted()) {
326 if (!entry.isCompleted()) {