2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import com.google.common.util.concurrent.FutureCallback;
12 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
13 import java.util.function.Function;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
20 final class StackedOutboundQueue extends AbstractStackedOutboundQueue {
21 private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
22 private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
24 private volatile long barrierXid = -1;
26 StackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
31 * This method is expected to be called from multiple threads concurrently
34 public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback,
35 final Function<OfHeader, Boolean> isCompletedFunction) {
36 final OutboundQueueEntry entry = getEntry(xid);
38 entry.commit(message, callback, isCompletedFunction);
39 if (entry.isBarrier()) {
42 final long prev = BARRIER_XID_UPDATER.getAndSet(this, my);
44 LOG.debug("Queue {} recorded pending barrier XID {}", this, my);
48 // We have traveled back, recover
49 LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
54 LOG.trace("Queue {} committed XID {}", this, xid);
55 manager.ensureFlushing();
58 Long reserveBarrierIfNeeded() {
59 if (isBarrierNeeded()) {
60 return reserveEntry();
66 * Checks if Barrier Request is the last message enqueued. If not, one needs
67 * to be scheduled in order to collect data about previous messages.
68 * @return true if last enqueued message is Barrier Request, false otherwise
70 boolean isBarrierNeeded() {
71 final long bXid = barrierXid;
72 final long fXid = firstSegment.getBaseXid() + flushOffset;
74 LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid);