Speed up StackedOutboundQueue.reserve()
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / StackedSegment.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.FinalizableReferenceQueue;
11 import com.google.common.base.FinalizableSoftReference;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import java.lang.ref.Reference;
16 import java.util.concurrent.ConcurrentLinkedDeque;
17 import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
18 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 final class StackedSegment {
25     private static final class QueueRef extends FinalizableSoftReference<OutboundQueueEntry[]> {
26         QueueRef(final FinalizableReferenceQueue queue, final OutboundQueueEntry[] referent) {
27             super(referent, queue);
28         }
29
30         @Override
31         public void finalizeReferent() {
32             CACHE.remove(this);
33         }
34     }
35
36     /**
37      * Size of each individual segment
38      */
39     static final int SEGMENT_SIZE = 4096;
40
41     private static final Logger LOG = LoggerFactory.getLogger(StackedSegment.class);
42     private static final FinalizableReferenceQueue REF_QUEUE = new FinalizableReferenceQueue();
43     private static final ConcurrentLinkedDeque<QueueRef> CACHE = new ConcurrentLinkedDeque<>();
44
45     private final OutboundQueueEntry[] entries;
46     private final long baseXid;
47     private final long endXid;
48
49     // Updated from netty only
50     private int lastBarrierOffset = -1;
51     private int completeCount;
52
53     StackedSegment(final long baseXid, final OutboundQueueEntry[] entries) {
54         this.baseXid = baseXid;
55         this.endXid = baseXid + SEGMENT_SIZE;
56         this.entries = Preconditions.checkNotNull(entries);
57     }
58
59     static StackedSegment create(final long baseXid) {
60         final StackedSegment ret;
61         for (;;) {
62             final Reference<OutboundQueueEntry[]> item = CACHE.pollLast();
63             if (item == null) {
64                 break;
65             }
66
67             final OutboundQueueEntry[] cached = item.get();
68             if (cached != null) {
69                 ret = new StackedSegment(baseXid, cached);
70                 LOG.trace("Reusing array {} in segment {}", cached, ret);
71                 return ret;
72             }
73         }
74
75         final OutboundQueueEntry[] entries = new OutboundQueueEntry[SEGMENT_SIZE];
76         for (int i = 0; i < SEGMENT_SIZE; ++i) {
77             entries[i] = new OutboundQueueEntry();
78         }
79
80         ret = new StackedSegment(baseXid, entries);
81         LOG.trace("Allocated new segment {}", ret);
82         return ret;
83     }
84
85     @Override
86     public String toString() {
87         return MoreObjects.toStringHelper(this).add("baseXid", baseXid).add("endXid", endXid).add("completeCount", completeCount).toString();
88     }
89
90     long getBaseXid() {
91         return baseXid;
92     }
93
94     long getEndXid() {
95         return endXid;
96     }
97
98     OutboundQueueEntry getEntry(final int offset) {
99         return entries[offset];
100     }
101
102     private boolean xidInRange(final long xid) {
103         return xid < endXid && (xid >= baseXid || baseXid > endXid);
104     }
105
106     private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
107         if (response instanceof Error) {
108             final Error err = (Error)response;
109             LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
110             entry.fail(new DeviceRequestFailedException("Device-side failure", err));
111             return true;
112         } else {
113             return entry.complete(response);
114         }
115     }
116
117     OutboundQueueEntry pairRequest(final OfHeader response) {
118         // Explicitly 'long' to force unboxing before performing operations
119         final long xid = response.getXid();
120         if (!xidInRange(xid)) {
121             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid);
122             return null;
123         }
124
125         final int offset = (int)(xid - baseXid);
126         final OutboundQueueEntry entry = entries[offset];
127         if (entry.isCompleted()) {
128             LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
129             return null;
130         }
131
132         if (entry.isBarrier()) {
133             // This has been a barrier -- make sure we complete all preceding requests.
134             // XXX: Barriers are expected to complete in one message.
135             //      If this assumption is changed, this logic will need to be expanded
136             //      to ensure that the requests implied by the barrier are reported as
137             //      completed *after* the barrier.
138             LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
139             completeRequests(offset);
140             lastBarrierOffset = offset;
141
142             final boolean success = completeEntry(entry, response);
143             Verify.verify(success, "Barrier request failed to complete");
144             completeCount++;
145         } else if (completeEntry(entry, response)) {
146             completeCount++;
147         }
148
149         return entry;
150     }
151
152     private void completeRequests(final int toOffset) {
153         for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
154             final OutboundQueueEntry entry = entries[i];
155             if (!entry.isCompleted() && entry.complete(null)) {
156                 completeCount++;
157             }
158         }
159     }
160
161     void completeAll() {
162         completeRequests(entries.length);
163     }
164
165     int failAll(final OutboundQueueException cause) {
166         int ret = 0;
167         for (int i = lastBarrierOffset + 1; i < entries.length; ++i) {
168             final OutboundQueueEntry entry = entries[i];
169             if (!entry.isCommitted()) {
170                 break;
171             }
172
173             if (!entry.isCompleted()) {
174                 entry.fail(cause);
175                 ret++;
176             }
177         }
178
179         return ret;
180     }
181
182     boolean isComplete() {
183         return completeCount >= entries.length;
184     }
185
186     void recycle() {
187         for (OutboundQueueEntry e : entries) {
188             e.reset();
189         }
190
191         CACHE.offer(new QueueRef(REF_QUEUE, entries));
192     }
193 }