2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.FutureCallback;
12 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
13 import java.util.concurrent.locks.LockSupport;
14 import javax.annotation.Nonnull;
15 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
20 final class OutboundQueueImpl implements OutboundQueue {
21 private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
22 private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
23 AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
24 private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
25 AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
26 private static final long FLUSH_RETRY_NANOS = 1L;
27 private final OutboundQueueManager<?> manager;
28 private final OutboundQueueEntry[] queue;
29 private final long baseXid;
30 private final long endXid;
31 private final int reserve;
33 // Updated concurrently
34 private volatile int barrierOffset = -1;
35 private volatile int reserveOffset = 0;
37 // Updated from Netty only
38 private int flushOffset;
39 private int completeCount;
41 OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
43 * We use the last entry as an emergency should a timeout-triggered
44 * flush request race with normal users for the last entry in this
45 * queue. In that case the flush request will take the last entry and
46 * schedule a flush, which means that we will get around sending the
47 * message as soon as the user finishes the reservation.
49 Preconditions.checkArgument(maxQueue > 1);
50 this.baseXid = baseXid;
51 this.endXid = baseXid + maxQueue;
52 this.reserve = maxQueue - 1;
53 this.manager = Preconditions.checkNotNull(manager);
54 queue = new OutboundQueueEntry[maxQueue];
55 for (int i = 0; i < maxQueue; ++i) {
56 queue[i] = new OutboundQueueEntry();
60 private OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final OutboundQueueEntry[] queue) {
61 this.manager = Preconditions.checkNotNull(manager);
62 this.queue = Preconditions.checkNotNull(queue);
63 this.baseXid = baseXid;
64 this.endXid = baseXid + queue.length;
65 this.reserve = queue.length - 1;
66 for (OutboundQueueEntry element : queue) {
71 OutboundQueueImpl reuse(final long baseXid) {
72 return new OutboundQueueImpl(manager, baseXid, queue);
76 public Long reserveEntry() {
77 return reserveEntry(false);
81 public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
82 final int offset = (int)(xid - baseXid);
83 if (message != null) {
84 Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
87 final OutboundQueueEntry entry = queue[offset];
88 entry.commit(message, callback);
89 LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
91 if (entry.isBarrier()) {
94 final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
96 LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
100 // We have traveled back, recover
105 manager.ensureFlushing(this);
108 private Long reserveEntry(final boolean forBarrier) {
109 final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
110 if (offset >= reserve) {
112 LOG.debug("Queue {} offset {}/{}, using emergency slot", this, offset, queue.length);
115 LOG.debug("Queue {} offset {}/{}, not allowing reservation", this, offset, queue.length);
120 final Long xid = baseXid + offset;
121 LOG.debug("Queue {} allocated XID {} at offset {}", this, xid, offset);
125 Long reserveBarrierIfNeeded() {
126 final int bo = barrierOffset;
127 if (bo >= flushOffset) {
128 LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
131 return reserveEntry(true);
136 * An empty queue is a queue which has no further unflushed entries.
138 * @return True if this queue does not have unprocessed entries.
141 int ro = reserveOffset;
143 if (queue[reserve].isCommitted()) {
150 LOG.debug("Effective flush/reserve offset {}/{}", flushOffset, ro);
151 return ro <= flushOffset;
155 * A queue is finished when all of its entries have been completed.
157 * @return False if there are any uncompleted requests.
159 boolean isFinished() {
160 if (completeCount < reserve) {
164 // We need to check if the last entry was used
165 final OutboundQueueEntry last = queue[reserve];
166 return !last.isCommitted() || last.isCompleted();
169 boolean isFlushed() {
170 LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
171 if (flushOffset < reserve) {
175 // flushOffset implied == reserve
176 return flushOffset >= queue.length || !queue[reserve].isCommitted();
179 OfHeader flushEntry() {
183 LOG.debug("Flush offset {} is uptodate with reserved", flushOffset);
187 boolean retry = true;
188 while (!queue[flushOffset].isCommitted()) {
190 LOG.debug("Offset {} not ready yet, giving up", flushOffset);
194 LOG.debug("Offset {} not ready yet, retrying", flushOffset);
195 LockSupport.parkNanos(FLUSH_RETRY_NANOS);
199 final OfHeader msg = queue[flushOffset++].getMessage();
206 private boolean xidInRance(final long xid) {
207 return xid < endXid && (xid >= baseXid || baseXid > endXid);
211 * Return the request entry corresponding to a response. Returns null
212 * if there is no request matching the response.
214 * @param response Response message
215 * @return Matching request entry, or null if no match is found.
217 OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
218 final Long xid = response.getXid();
219 if (!xidInRance(xid)) {
220 LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
224 final int offset = (int)(xid - baseXid);
225 final OutboundQueueEntry entry = queue[offset];
226 if (entry.isCompleted()) {
227 LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
231 if (entry.complete(response)) {
234 // This has been a barrier -- make sure we complete all preceding requests
235 if (entry.isBarrier()) {
236 LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid, xid - 1);
237 for (int i = 0; i < offset; ++i) {
238 final OutboundQueueEntry e = queue[i];
239 if (!e.isCompleted() && e.complete(null)) {
249 for (OutboundQueueEntry entry : queue) {
250 if (!entry.isCompleted() && entry.complete(null)) {
256 int failAll(final Throwable cause) {
258 for (OutboundQueueEntry entry : queue) {
259 if (!entry.isCompleted()) {