2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import static com.google.common.base.Verify.verify;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.base.FinalizableReferenceQueue;
14 import com.google.common.base.FinalizableSoftReference;
15 import com.google.common.base.MoreObjects;
16 import java.lang.ref.Reference;
17 import java.util.concurrent.ConcurrentLinkedDeque;
18 import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
19 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 final class StackedSegment {
26 private static final class QueueRef extends FinalizableSoftReference<OutboundQueueEntry[]> {
27 QueueRef(final FinalizableReferenceQueue queue, final OutboundQueueEntry[] referent) {
28 super(referent, queue);
32 public void finalizeReferent() {
38 * Size of each individual segment.
40 static final int SEGMENT_SIZE = 4096;
42 private static final Logger LOG = LoggerFactory.getLogger(StackedSegment.class);
43 private static final FinalizableReferenceQueue REF_QUEUE = new FinalizableReferenceQueue();
44 private static final ConcurrentLinkedDeque<QueueRef> CACHE = new ConcurrentLinkedDeque<>();
46 private final OutboundQueueEntry[] entries;
47 private final long baseXid;
48 private final long endXid;
50 // Updated from netty only
51 private int lastBarrierOffset = -1;
52 private int completeCount;
54 StackedSegment(final long baseXid, final OutboundQueueEntry[] entries) {
55 this.baseXid = baseXid;
56 this.endXid = baseXid + SEGMENT_SIZE;
57 this.entries = requireNonNull(entries);
60 static StackedSegment create(final long baseXid) {
61 final StackedSegment ret;
63 final Reference<OutboundQueueEntry[]> item = CACHE.pollLast();
68 final OutboundQueueEntry[] cached = item.get();
70 ret = new StackedSegment(baseXid, cached);
71 LOG.trace("Reusing array {} in segment {}", cached, ret);
76 final OutboundQueueEntry[] entries = new OutboundQueueEntry[SEGMENT_SIZE];
77 for (int i = 0; i < SEGMENT_SIZE; ++i) {
78 entries[i] = new OutboundQueueEntry();
81 ret = new StackedSegment(baseXid, entries);
82 LOG.trace("Allocated new segment {}", ret);
87 public String toString() {
88 return MoreObjects.toStringHelper(this).add("baseXid", baseXid).add("endXid", endXid)
89 .add("completeCount", completeCount).toString();
100 OutboundQueueEntry getEntry(final int offset) {
101 return entries[offset];
104 private boolean xidInRange(final long xid) {
105 return xid < endXid && (xid >= baseXid || baseXid > endXid);
108 private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
109 if (response instanceof Error) {
110 final Error err = (Error)response;
111 LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(),
112 err.getCodeString());
113 entry.fail(new DeviceRequestFailedException("Device-side failure", err));
116 return entry.complete(response);
119 OutboundQueueEntry findEntry(final long xid) {
120 if (! xidInRange(xid)) {
121 LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid);
124 final int offset = (int)(xid - baseXid);
125 return entries[offset];
128 OutboundQueueEntry pairRequest(final OfHeader response) {
129 // Explicitly 'long' to force unboxing before performing operations
130 final long xid = response.getXid().toJava();
131 if (!xidInRange(xid)) {
132 LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid);
136 final int offset = (int) (xid - baseXid);
137 final OutboundQueueEntry entry = entries[offset];
138 if (entry.isCompleted()) {
139 LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
143 if (entry.isBarrier()) {
144 // This has been a barrier -- make sure we complete all preceding requests.
145 // XXX: Barriers are expected to complete in one message.
146 // If this assumption is changed, this logic will need to be expanded
147 // to ensure that the requests implied by the barrier are reported as
148 // completed *after* the barrier.
149 LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid,
150 baseXid + lastBarrierOffset + 1, xid - 1);
151 completeRequests(offset);
152 lastBarrierOffset = offset;
154 final boolean success = completeEntry(entry, response);
155 verify(success, "Barrier request failed to complete");
157 } else if (completeEntry(entry, response)) {
164 private void completeRequests(final int toOffset) {
165 for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
166 final OutboundQueueEntry entry = entries[i];
167 if (!entry.isCompleted() && entry.complete(null)) {
174 completeRequests(entries.length);
177 int failAll(final OutboundQueueException cause) {
179 for (int i = lastBarrierOffset + 1; i < entries.length; ++i) {
180 final OutboundQueueEntry entry = entries[i];
181 if (!entry.isCommitted()) {
185 if (!entry.isCompleted()) {
195 boolean isComplete() {
196 return completeCount >= entries.length;
200 for (final OutboundQueueEntry e : entries) {
204 CACHE.offer(new QueueRef(REF_QUEUE, entries));