Merge "Barrier turn on/off - move more functionality from StackedOutboundQueue" into...
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / StackedOutboundQueue.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import com.google.common.util.concurrent.FutureCallback;
13 import java.util.Iterator;
14 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 final class StackedOutboundQueue extends AbstractStackedOutboundQueue {
20     private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
21     private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
22
23     private volatile long barrierXid = -1;
24
25     StackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
26         super(manager);
27     }
28
29     /*
30      * This method is expected to be called from multiple threads concurrently
31      */
32     @Override
33     public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
34         final StackedSegment fastSegment = firstSegment;
35         final long calcOffset = xid - fastSegment.getBaseXid();
36         Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid());
37
38         Verify.verify(calcOffset <= Integer.MAX_VALUE);
39         final int fastOffset = (int) calcOffset;
40
41         final OutboundQueueEntry entry;
42         if (fastOffset >= StackedSegment.SEGMENT_SIZE) {
43             LOG.debug("Queue {} falling back to slow commit of XID {} at offset {}", this, xid, fastOffset);
44
45             final StackedSegment segment;
46             final int slowOffset;
47             synchronized (unflushedSegments) {
48                 final StackedSegment slowSegment = firstSegment;
49                 final long slowCalcOffset = xid - slowSegment.getBaseXid();
50                 Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE);
51                 slowOffset = (int) slowCalcOffset;
52
53                 LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset);
54                 segment = unflushedSegments.get(slowOffset / StackedSegment.SEGMENT_SIZE);
55             }
56
57             final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE;
58             entry = segment.getEntry(segOffset);
59             LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset);
60         } else {
61             entry = fastSegment.getEntry(fastOffset);
62         }
63
64         entry.commit(message, callback);
65         if (entry.isBarrier()) {
66             long my = xid;
67             for (;;) {
68                 final long prev = BARRIER_XID_UPDATER.getAndSet(this, my);
69                 if (prev < my) {
70                     LOG.debug("Queue {} recorded pending barrier XID {}", this, my);
71                     break;
72                 }
73
74                 // We have traveled back, recover
75                 LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
76                 my = prev;
77             }
78         }
79
80         LOG.trace("Queue {} committed XID {}", this, xid);
81         manager.ensureFlushing();
82     }
83
84     Long reserveBarrierIfNeeded() {
85         final long bXid = barrierXid;
86         final long fXid = firstSegment.getBaseXid() + flushOffset;
87         if (bXid >= fXid) {
88             LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid);
89             return null;
90         }
91         return reserveEntry();
92     }
93
94     @Override
95     boolean pairRequest(final OfHeader message) {
96         Iterator<StackedSegment> it = uncompletedSegments.iterator();
97         while (it.hasNext()) {
98             final StackedSegment queue = it.next();
99             final OutboundQueueEntry entry = queue.pairRequest(message);
100             if (entry == null) {
101                 continue;
102             }
103
104             LOG.trace("Queue {} accepted response {}", queue, message);
105
106             // This has been a barrier request, we need to flush all
107             // previous queues
108             if (entry.isBarrier() && uncompletedSegments.size() > 1) {
109                 LOG.trace("Queue {} indicated request was a barrier", queue);
110
111                 it = uncompletedSegments.iterator();
112                 while (it.hasNext()) {
113                     final StackedSegment q = it.next();
114
115                     // We want to complete all queues before the current one, we will
116                     // complete the current queue below
117                     if (!queue.equals(q)) {
118                         LOG.trace("Queue {} is implied finished", q);
119                         q.completeAll();
120                         it.remove();
121                         q.recycle();
122                     } else {
123                         break;
124                     }
125                 }
126             }
127
128             if (queue.isComplete()) {
129                 LOG.trace("Queue {} is finished", queue);
130                 it.remove();
131                 queue.recycle();
132             }
133
134             return true;
135         }
136
137         LOG.debug("Failed to find completion for message {}", message);
138         return false;
139     }
140 }