Add method to register listener for unknown msg
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / StackedOutboundQueue.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.util.concurrent.FutureCallback;
11
12 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
13 import java.util.function.Function;
14
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
16
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 final class StackedOutboundQueue extends AbstractStackedOutboundQueue {
21     private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
22     private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
23
24     private volatile long barrierXid = -1;
25
26     StackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
27         super(manager);
28     }
29
30     /*
31      * This method is expected to be called from multiple threads concurrently
32      */
33     @Override
34     public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback,
35             final Function<OfHeader, Boolean> isCompletedFunction) {
36         final OutboundQueueEntry entry = getEntry(xid);
37
38         entry.commit(message, callback, isCompletedFunction);
39         if (entry.isBarrier()) {
40             long my = xid;
41             for (;;) {
42                 final long prev = BARRIER_XID_UPDATER.getAndSet(this, my);
43                 if (prev < my) {
44                     LOG.debug("Queue {} recorded pending barrier XID {}", this, my);
45                     break;
46                 }
47
48                 // We have traveled back, recover
49                 LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
50                 my = prev;
51             }
52         }
53
54         LOG.trace("Queue {} committed XID {}", this, xid);
55         manager.ensureFlushing();
56     }
57
58     Long reserveBarrierIfNeeded() {
59         if (isBarrierNeeded()) {
60             return reserveEntry();
61         }
62         return null;
63     }
64
65     /**
66      * Checks if Barrier Request is the last message enqueued. If not, one needs
67      * to be scheduled in order to collect data about previous messages.
68      * @return true if last enqueued message is Barrier Request, false otherwise
69      */
70     boolean isBarrierNeeded() {
71         final long bXid = barrierXid;
72         final long fXid = firstSegment.getBaseXid() + flushOffset;
73         if (bXid >= fXid) {
74             LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid);
75             return false;
76         }
77         return true;
78     }
79 }