2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import com.google.common.util.concurrent.FutureCallback;
11 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
12 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
16 final class StackedOutboundQueue extends AbstractStackedOutboundQueue {
17 private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
18 private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
20 private volatile long barrierXid = -1;
22 StackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
27 * This method is expected to be called from multiple threads concurrently
30 public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
31 final OutboundQueueEntry entry = getEntry(xid);
33 entry.commit(message, callback);
34 if (entry.isBarrier()) {
37 final long prev = BARRIER_XID_UPDATER.getAndSet(this, my);
39 LOG.debug("Queue {} recorded pending barrier XID {}", this, my);
43 // We have traveled back, recover
44 LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
49 LOG.trace("Queue {} committed XID {}", this, xid);
50 manager.ensureFlushing();
53 Long reserveBarrierIfNeeded() {
54 if (isBarrierNeeded()) {
55 return reserveEntry();
61 * Checks if Barrier Request is the last message enqueued. If not, one needs
62 * to be scheduled in order to collect data about previous messages.
63 * @return true if last enqueued message is Barrier Request, false otherwise
65 boolean isBarrierNeeded() {
66 final long bXid = barrierXid;
67 final long fXid = firstSegment.getBaseXid() + flushOffset;
69 LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid);