cafd114c1a021219a5f9af69c103771253e19332
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / StackedOutboundQueue.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
12 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15
16 final class StackedOutboundQueue extends AbstractStackedOutboundQueue {
17     private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
18     private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
19
20     private volatile long barrierXid = -1;
21
22     StackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
23         super(manager);
24     }
25
26     /*
27      * This method is expected to be called from multiple threads concurrently
28      */
29     @Override
30     public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
31         final OutboundQueueEntry entry = getEntry(xid);
32
33         entry.commit(message, callback);
34         if (entry.isBarrier()) {
35             long my = xid;
36             for (;;) {
37                 final long prev = BARRIER_XID_UPDATER.getAndSet(this, my);
38                 if (prev < my) {
39                     LOG.debug("Queue {} recorded pending barrier XID {}", this, my);
40                     break;
41                 }
42
43                 // We have traveled back, recover
44                 LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
45                 my = prev;
46             }
47         }
48
49         LOG.trace("Queue {} committed XID {}", this, xid);
50         manager.ensureFlushing();
51     }
52
53     Long reserveBarrierIfNeeded() {
54         if (isBarrierNeeded()) {
55             return reserveEntry();
56         }
57         return null;
58     }
59
60     /**
61      * Checks if Barrier Request is the last message enqueued. If not, one needs
62      * to be scheduled in order to collect data about previous messages.
63      * @return true if last enqueued message is Barrier Request, false otherwise
64      */
65     boolean isBarrierNeeded() {
66         final long bXid = barrierXid;
67         final long fXid = firstSegment.getBaseXid() + flushOffset;
68         if (bXid >= fXid) {
69             LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid);
70             return false;
71         }
72         return true;
73     }
74 }