Merge "Use String(byte[], Charset)"
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / StackedSegment.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import static com.google.common.base.Verify.verify;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.base.FinalizableReferenceQueue;
14 import com.google.common.base.FinalizableSoftReference;
15 import com.google.common.base.MoreObjects;
16 import java.lang.ref.Reference;
17 import java.util.concurrent.ConcurrentLinkedDeque;
18 import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
19 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 final class StackedSegment {
26     private static final class QueueRef extends FinalizableSoftReference<OutboundQueueEntry[]> {
27         QueueRef(final FinalizableReferenceQueue queue, final OutboundQueueEntry[] referent) {
28             super(referent, queue);
29         }
30
31         @Override
32         public void finalizeReferent() {
33             CACHE.remove(this);
34         }
35     }
36
37     /**
38      * Size of each individual segment.
39      */
40     static final int SEGMENT_SIZE = 4096;
41
42     private static final Logger LOG = LoggerFactory.getLogger(StackedSegment.class);
43     private static final FinalizableReferenceQueue REF_QUEUE = new FinalizableReferenceQueue();
44     private static final ConcurrentLinkedDeque<QueueRef> CACHE = new ConcurrentLinkedDeque<>();
45
46     private final OutboundQueueEntry[] entries;
47     private final long baseXid;
48     private final long endXid;
49
50     // Updated from netty only
51     private int lastBarrierOffset = -1;
52     private int completeCount;
53
54     StackedSegment(final long baseXid, final OutboundQueueEntry[] entries) {
55         this.baseXid = baseXid;
56         this.endXid = baseXid + SEGMENT_SIZE;
57         this.entries = requireNonNull(entries);
58     }
59
60     static StackedSegment create(final long baseXid) {
61         final StackedSegment ret;
62         for (;;) {
63             final Reference<OutboundQueueEntry[]> item = CACHE.pollLast();
64             if (item == null) {
65                 break;
66             }
67
68             final OutboundQueueEntry[] cached = item.get();
69             if (cached != null) {
70                 ret = new StackedSegment(baseXid, cached);
71                 LOG.trace("Reusing array {} in segment {}", cached, ret);
72                 return ret;
73             }
74         }
75
76         final OutboundQueueEntry[] entries = new OutboundQueueEntry[SEGMENT_SIZE];
77         for (int i = 0; i < SEGMENT_SIZE; ++i) {
78             entries[i] = new OutboundQueueEntry();
79         }
80
81         ret = new StackedSegment(baseXid, entries);
82         LOG.trace("Allocated new segment {}", ret);
83         return ret;
84     }
85
86     @Override
87     public String toString() {
88         return MoreObjects.toStringHelper(this).add("baseXid", baseXid).add("endXid", endXid)
89                 .add("completeCount", completeCount).toString();
90     }
91
92     long getBaseXid() {
93         return baseXid;
94     }
95
96     long getEndXid() {
97         return endXid;
98     }
99
100     OutboundQueueEntry getEntry(final int offset) {
101         return entries[offset];
102     }
103
104     private boolean xidInRange(final long xid) {
105         return xid < endXid && (xid >= baseXid || baseXid > endXid);
106     }
107
108     private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
109         if (response instanceof Error) {
110             final Error err = (Error)response;
111             LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(),
112                     err.getCodeString());
113             entry.fail(new DeviceRequestFailedException("Device-side failure", err));
114             return true;
115         }
116         return entry.complete(response);
117     }
118
119     OutboundQueueEntry findEntry(final long xid) {
120         if (! xidInRange(xid)) {
121             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid);
122             return null;
123         }
124         final int offset = (int)(xid - baseXid);
125         return entries[offset];
126     }
127
128     OutboundQueueEntry pairRequest(final OfHeader response) {
129         // Explicitly 'long' to force unboxing before performing operations
130         final long xid = response.getXid().toJava();
131         if (!xidInRange(xid)) {
132             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid);
133             return null;
134         }
135
136         final int offset = (int) (xid - baseXid);
137         final OutboundQueueEntry entry = entries[offset];
138         if (entry.isCompleted()) {
139             LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
140             return null;
141         }
142
143         if (entry.isBarrier()) {
144             // This has been a barrier -- make sure we complete all preceding requests.
145             // XXX: Barriers are expected to complete in one message.
146             //      If this assumption is changed, this logic will need to be expanded
147             //      to ensure that the requests implied by the barrier are reported as
148             //      completed *after* the barrier.
149             LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid,
150                     baseXid + lastBarrierOffset + 1, xid - 1);
151             completeRequests(offset);
152             lastBarrierOffset = offset;
153
154             final boolean success = completeEntry(entry, response);
155             verify(success, "Barrier request failed to complete");
156             completeCount++;
157         } else if (completeEntry(entry, response)) {
158             completeCount++;
159         }
160
161         return entry;
162     }
163
164     private void completeRequests(final int toOffset) {
165         for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
166             final OutboundQueueEntry entry = entries[i];
167             if (!entry.isCompleted() && entry.complete(null)) {
168                 completeCount++;
169             }
170         }
171     }
172
173     void completeAll() {
174         completeRequests(entries.length);
175     }
176
177     int failAll(final OutboundQueueException cause) {
178         int ret = 0;
179         for (int i = lastBarrierOffset + 1; i < entries.length; ++i) {
180             final OutboundQueueEntry entry = entries[i];
181             if (!entry.isCommitted()) {
182                 break;
183             }
184
185             if (!entry.isCompleted()) {
186                 entry.fail(cause);
187                 completeCount++;
188                 ret++;
189             }
190         }
191
192         return ret;
193     }
194
195     boolean isComplete() {
196         return completeCount >= entries.length;
197     }
198
199     void recycle() {
200         for (final OutboundQueueEntry e : entries) {
201             e.reset();
202         }
203
204         CACHE.offer(new QueueRef(REF_QUEUE, entries));
205     }
206 }