2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import com.google.common.base.FinalizableReferenceQueue;
11 import com.google.common.base.FinalizableSoftReference;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import java.lang.ref.Reference;
16 import java.util.concurrent.ConcurrentLinkedDeque;
17 import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
18 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 final class StackedSegment {
25 private static final class QueueRef extends FinalizableSoftReference<OutboundQueueEntry[]> {
26 QueueRef(final FinalizableReferenceQueue queue, final OutboundQueueEntry[] referent) {
27 super(referent, queue);
31 public void finalizeReferent() {
37 * Size of each individual segment
39 static final int SEGMENT_SIZE = 4096;
41 private static final Logger LOG = LoggerFactory.getLogger(StackedSegment.class);
42 private static final FinalizableReferenceQueue REF_QUEUE = new FinalizableReferenceQueue();
43 private static final ConcurrentLinkedDeque<QueueRef> CACHE = new ConcurrentLinkedDeque<>();
45 private final OutboundQueueEntry[] entries;
46 private final long baseXid;
47 private final long endXid;
49 // Updated from netty only
50 private int lastBarrierOffset = -1;
51 private int completeCount;
53 StackedSegment(final long baseXid, final OutboundQueueEntry[] entries) {
54 this.baseXid = baseXid;
55 this.endXid = baseXid + SEGMENT_SIZE;
56 this.entries = Preconditions.checkNotNull(entries);
59 static StackedSegment create(final long baseXid) {
60 final StackedSegment ret;
62 final Reference<OutboundQueueEntry[]> item = CACHE.pollLast();
67 final OutboundQueueEntry[] cached = item.get();
69 ret = new StackedSegment(baseXid, cached);
70 LOG.trace("Reusing array {} in segment {}", cached, ret);
75 final OutboundQueueEntry[] entries = new OutboundQueueEntry[SEGMENT_SIZE];
76 for (int i = 0; i < SEGMENT_SIZE; ++i) {
77 entries[i] = new OutboundQueueEntry();
80 ret = new StackedSegment(baseXid, entries);
81 LOG.trace("Allocated new segment {}", ret);
86 public String toString() {
87 return MoreObjects.toStringHelper(this).add("baseXid", baseXid).add("endXid", endXid).add("completeCount", completeCount).toString();
98 OutboundQueueEntry getEntry(final int offset) {
99 return entries[offset];
102 private boolean xidInRange(final long xid) {
103 return xid < endXid && (xid >= baseXid || baseXid > endXid);
106 private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
107 if (response instanceof Error) {
108 final Error err = (Error)response;
109 LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
110 entry.fail(new DeviceRequestFailedException("Device-side failure", err));
113 return entry.complete(response);
116 OutboundQueueEntry findEntry(final long xid) {
117 if (! xidInRange(xid)) {
118 LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid);
121 final int offset = (int)(xid - baseXid);
122 return entries[offset];
125 OutboundQueueEntry pairRequest(final OfHeader response) {
126 // Explicitly 'long' to force unboxing before performing operations
127 final long xid = response.getXid();
128 if (!xidInRange(xid)) {
129 LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid);
133 final int offset = (int) (xid - baseXid);
134 final OutboundQueueEntry entry = entries[offset];
135 if (entry.isCompleted()) {
136 LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
140 if (entry.isBarrier()) {
141 // This has been a barrier -- make sure we complete all preceding requests.
142 // XXX: Barriers are expected to complete in one message.
143 // If this assumption is changed, this logic will need to be expanded
144 // to ensure that the requests implied by the barrier are reported as
145 // completed *after* the barrier.
146 LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
147 completeRequests(offset);
148 lastBarrierOffset = offset;
150 final boolean success = completeEntry(entry, response);
151 Verify.verify(success, "Barrier request failed to complete");
153 } else if (completeEntry(entry, response)) {
160 private void completeRequests(final int toOffset) {
161 for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
162 final OutboundQueueEntry entry = entries[i];
163 if (!entry.isCompleted() && entry.complete(null)) {
170 completeRequests(entries.length);
173 int failAll(final OutboundQueueException cause) {
175 for (int i = lastBarrierOffset + 1; i < entries.length; ++i) {
176 final OutboundQueueEntry entry = entries[i];
177 if (!entry.isCommitted()) {
181 if (!entry.isCompleted()) {
191 boolean isComplete() {
192 return completeCount >= entries.length;
196 for (final OutboundQueueEntry e : entries) {
200 CACHE.offer(new QueueRef(REF_QUEUE, entries));