Use String(byte[], Charset)
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / StackedSegment.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.FinalizableReferenceQueue;
11 import com.google.common.base.FinalizableSoftReference;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import java.lang.ref.Reference;
16 import java.util.concurrent.ConcurrentLinkedDeque;
17 import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
18 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 final class StackedSegment {
25     private static final class QueueRef extends FinalizableSoftReference<OutboundQueueEntry[]> {
26         QueueRef(final FinalizableReferenceQueue queue, final OutboundQueueEntry[] referent) {
27             super(referent, queue);
28         }
29
30         @Override
31         public void finalizeReferent() {
32             CACHE.remove(this);
33         }
34     }
35
36     /**
37      * Size of each individual segment.
38      */
39     static final int SEGMENT_SIZE = 4096;
40
41     private static final Logger LOG = LoggerFactory.getLogger(StackedSegment.class);
42     private static final FinalizableReferenceQueue REF_QUEUE = new FinalizableReferenceQueue();
43     private static final ConcurrentLinkedDeque<QueueRef> CACHE = new ConcurrentLinkedDeque<>();
44
45     private final OutboundQueueEntry[] entries;
46     private final long baseXid;
47     private final long endXid;
48
49     // Updated from netty only
50     private int lastBarrierOffset = -1;
51     private int completeCount;
52
53     StackedSegment(final long baseXid, final OutboundQueueEntry[] entries) {
54         this.baseXid = baseXid;
55         this.endXid = baseXid + SEGMENT_SIZE;
56         this.entries = Preconditions.checkNotNull(entries);
57     }
58
59     static StackedSegment create(final long baseXid) {
60         final StackedSegment ret;
61         for (;;) {
62             final Reference<OutboundQueueEntry[]> item = CACHE.pollLast();
63             if (item == null) {
64                 break;
65             }
66
67             final OutboundQueueEntry[] cached = item.get();
68             if (cached != null) {
69                 ret = new StackedSegment(baseXid, cached);
70                 LOG.trace("Reusing array {} in segment {}", cached, ret);
71                 return ret;
72             }
73         }
74
75         final OutboundQueueEntry[] entries = new OutboundQueueEntry[SEGMENT_SIZE];
76         for (int i = 0; i < SEGMENT_SIZE; ++i) {
77             entries[i] = new OutboundQueueEntry();
78         }
79
80         ret = new StackedSegment(baseXid, entries);
81         LOG.trace("Allocated new segment {}", ret);
82         return ret;
83     }
84
85     @Override
86     public String toString() {
87         return MoreObjects.toStringHelper(this).add("baseXid", baseXid).add("endXid", endXid)
88                 .add("completeCount", completeCount).toString();
89     }
90
91     long getBaseXid() {
92         return baseXid;
93     }
94
95     long getEndXid() {
96         return endXid;
97     }
98
99     OutboundQueueEntry getEntry(final int offset) {
100         return entries[offset];
101     }
102
103     private boolean xidInRange(final long xid) {
104         return xid < endXid && (xid >= baseXid || baseXid > endXid);
105     }
106
107     private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
108         if (response instanceof Error) {
109             final Error err = (Error)response;
110             LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(),
111                     err.getCodeString());
112             entry.fail(new DeviceRequestFailedException("Device-side failure", err));
113             return true;
114         }
115         return entry.complete(response);
116     }
117
118     OutboundQueueEntry findEntry(final long xid) {
119         if (! xidInRange(xid)) {
120             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid);
121             return null;
122         }
123         final int offset = (int)(xid - baseXid);
124         return entries[offset];
125     }
126
127     OutboundQueueEntry pairRequest(final OfHeader response) {
128         // Explicitly 'long' to force unboxing before performing operations
129         final long xid = response.getXid().toJava();
130         if (!xidInRange(xid)) {
131             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid);
132             return null;
133         }
134
135         final int offset = (int) (xid - baseXid);
136         final OutboundQueueEntry entry = entries[offset];
137         if (entry.isCompleted()) {
138             LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
139             return null;
140         }
141
142         if (entry.isBarrier()) {
143             // This has been a barrier -- make sure we complete all preceding requests.
144             // XXX: Barriers are expected to complete in one message.
145             //      If this assumption is changed, this logic will need to be expanded
146             //      to ensure that the requests implied by the barrier are reported as
147             //      completed *after* the barrier.
148             LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid,
149                     baseXid + lastBarrierOffset + 1, xid - 1);
150             completeRequests(offset);
151             lastBarrierOffset = offset;
152
153             final boolean success = completeEntry(entry, response);
154             Verify.verify(success, "Barrier request failed to complete");
155             completeCount++;
156         } else if (completeEntry(entry, response)) {
157             completeCount++;
158         }
159
160         return entry;
161     }
162
163     private void completeRequests(final int toOffset) {
164         for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
165             final OutboundQueueEntry entry = entries[i];
166             if (!entry.isCompleted() && entry.complete(null)) {
167                 completeCount++;
168             }
169         }
170     }
171
172     void completeAll() {
173         completeRequests(entries.length);
174     }
175
176     int failAll(final OutboundQueueException cause) {
177         int ret = 0;
178         for (int i = lastBarrierOffset + 1; i < entries.length; ++i) {
179             final OutboundQueueEntry entry = entries[i];
180             if (!entry.isCommitted()) {
181                 break;
182             }
183
184             if (!entry.isCompleted()) {
185                 entry.fail(cause);
186                 completeCount++;
187                 ret++;
188             }
189         }
190
191         return ret;
192     }
193
194     boolean isComplete() {
195         return completeCount >= entries.length;
196     }
197
198     void recycle() {
199         for (final OutboundQueueEntry e : entries) {
200             e.reset();
201         }
202
203         CACHE.offer(new QueueRef(REF_QUEUE, entries));
204     }
205 }