2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import com.google.common.util.concurrent.FutureCallback;
13 import java.util.Iterator;
14 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
19 final class StackedOutboundQueue extends AbstractStackedOutboundQueue {
20 private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
21 private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
23 private volatile long barrierXid = -1;
25 StackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
30 * This method is expected to be called from multiple threads concurrently
33 public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
34 final StackedSegment fastSegment = firstSegment;
35 final long calcOffset = xid - fastSegment.getBaseXid();
36 Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid());
38 Verify.verify(calcOffset <= Integer.MAX_VALUE);
39 final int fastOffset = (int) calcOffset;
41 final OutboundQueueEntry entry;
42 if (fastOffset >= StackedSegment.SEGMENT_SIZE) {
43 LOG.debug("Queue {} falling back to slow commit of XID {} at offset {}", this, xid, fastOffset);
45 final StackedSegment segment;
47 synchronized (unflushedSegments) {
48 final StackedSegment slowSegment = firstSegment;
49 final long slowCalcOffset = xid - slowSegment.getBaseXid();
50 Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE);
51 slowOffset = (int) slowCalcOffset;
53 LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset);
54 segment = unflushedSegments.get(slowOffset / StackedSegment.SEGMENT_SIZE);
57 final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE;
58 entry = segment.getEntry(segOffset);
59 LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset);
61 entry = fastSegment.getEntry(fastOffset);
64 entry.commit(message, callback);
65 if (entry.isBarrier()) {
68 final long prev = BARRIER_XID_UPDATER.getAndSet(this, my);
70 LOG.debug("Queue {} recorded pending barrier XID {}", this, my);
74 // We have traveled back, recover
75 LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
80 LOG.trace("Queue {} committed XID {}", this, xid);
81 manager.ensureFlushing();
84 Long reserveBarrierIfNeeded() {
85 final long bXid = barrierXid;
86 final long fXid = firstSegment.getBaseXid() + flushOffset;
88 LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid);
91 return reserveEntry();
95 boolean pairRequest(final OfHeader message) {
96 Iterator<StackedSegment> it = uncompletedSegments.iterator();
97 while (it.hasNext()) {
98 final StackedSegment queue = it.next();
99 final OutboundQueueEntry entry = queue.pairRequest(message);
104 LOG.trace("Queue {} accepted response {}", queue, message);
106 // This has been a barrier request, we need to flush all
108 if (entry.isBarrier() && uncompletedSegments.size() > 1) {
109 LOG.trace("Queue {} indicated request was a barrier", queue);
111 it = uncompletedSegments.iterator();
112 while (it.hasNext()) {
113 final StackedSegment q = it.next();
115 // We want to complete all queues before the current one, we will
116 // complete the current queue below
117 if (!queue.equals(q)) {
118 LOG.trace("Queue {} is implied finished", q);
128 if (queue.isComplete()) {
129 LOG.trace("Queue {} is finished", queue);
137 LOG.debug("Failed to find completion for message {}", message);