c971c663af5596537d3e49f430862b858220215f
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / StackedSegment.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.FinalizableReferenceQueue;
11 import com.google.common.base.FinalizableSoftReference;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import java.lang.ref.Reference;
16 import java.util.concurrent.ConcurrentLinkedDeque;
17 import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
18 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 final class StackedSegment {
25     private static final class QueueRef extends FinalizableSoftReference<OutboundQueueEntry[]> {
26         QueueRef(final FinalizableReferenceQueue queue, final OutboundQueueEntry[] referent) {
27             super(referent, queue);
28         }
29
30         @Override
31         public void finalizeReferent() {
32             CACHE.remove(this);
33         }
34     }
35
36     /**
37      * Size of each individual segment
38      */
39     static final int SEGMENT_SIZE = 4096;
40
41     private static final Logger LOG = LoggerFactory.getLogger(StackedSegment.class);
42     private static final FinalizableReferenceQueue REF_QUEUE = new FinalizableReferenceQueue();
43     private static final ConcurrentLinkedDeque<QueueRef> CACHE = new ConcurrentLinkedDeque<>();
44
45     private final OutboundQueueEntry[] entries;
46     private final long baseXid;
47     private final long endXid;
48
49     // Updated from netty only
50     private int lastBarrierOffset = -1;
51     private int completeCount;
52
53     StackedSegment(final long baseXid, final OutboundQueueEntry[] entries) {
54         this.baseXid = baseXid;
55         this.endXid = baseXid + SEGMENT_SIZE;
56         this.entries = Preconditions.checkNotNull(entries);
57     }
58
59     static StackedSegment create(final long baseXid) {
60         final StackedSegment ret;
61         for (;;) {
62             final Reference<OutboundQueueEntry[]> item = CACHE.pollLast();
63             if (item == null) {
64                 break;
65             }
66
67             final OutboundQueueEntry[] cached = item.get();
68             if (cached != null) {
69                 ret = new StackedSegment(baseXid, cached);
70                 LOG.trace("Reusing array {} in segment {}", cached, ret);
71                 return ret;
72             }
73         }
74
75         final OutboundQueueEntry[] entries = new OutboundQueueEntry[SEGMENT_SIZE];
76         for (int i = 0; i < SEGMENT_SIZE; ++i) {
77             entries[i] = new OutboundQueueEntry();
78         }
79
80         ret = new StackedSegment(baseXid, entries);
81         LOG.trace("Allocated new segment {}", ret);
82         return ret;
83     }
84
85     @Override
86     public String toString() {
87         return MoreObjects.toStringHelper(this).add("baseXid", baseXid).add("endXid", endXid).add("completeCount", completeCount).toString();
88     }
89
90     long getBaseXid() {
91         return baseXid;
92     }
93
94     long getEndXid() {
95         return endXid;
96     }
97
98     OutboundQueueEntry getEntry(final int offset) {
99         return entries[offset];
100     }
101
102     private boolean xidInRange(final long xid) {
103         return xid < endXid && (xid >= baseXid || baseXid > endXid);
104     }
105
106     private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
107         if (response instanceof Error) {
108             final Error err = (Error)response;
109             LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
110             entry.fail(new DeviceRequestFailedException("Device-side failure", err));
111             return true;
112         }
113         return entry.complete(response);
114     }
115
116     OutboundQueueEntry findEntry(final long xid) {
117         if (! xidInRange(xid)) {
118             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid);
119             return null;
120         }
121         final int offset = (int)(xid - baseXid);
122         return entries[offset];
123     }
124
125     OutboundQueueEntry pairRequest(final OfHeader response) {
126         // Explicitly 'long' to force unboxing before performing operations
127         final long xid = response.getXid();
128         if (!xidInRange(xid)) {
129             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid);
130             return null;
131         }
132
133         final int offset = (int) (xid - baseXid);
134         final OutboundQueueEntry entry = entries[offset];
135         if (entry.isCompleted()) {
136             LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
137             return null;
138         }
139
140         if (entry.isBarrier()) {
141             // This has been a barrier -- make sure we complete all preceding requests.
142             // XXX: Barriers are expected to complete in one message.
143             //      If this assumption is changed, this logic will need to be expanded
144             //      to ensure that the requests implied by the barrier are reported as
145             //      completed *after* the barrier.
146             LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
147             completeRequests(offset);
148             lastBarrierOffset = offset;
149
150             final boolean success = completeEntry(entry, response);
151             Verify.verify(success, "Barrier request failed to complete");
152             completeCount++;
153         } else if (completeEntry(entry, response)) {
154             completeCount++;
155         }
156
157         return entry;
158     }
159
160     private void completeRequests(final int toOffset) {
161         for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
162             final OutboundQueueEntry entry = entries[i];
163             if (!entry.isCompleted() && entry.complete(null)) {
164                 completeCount++;
165             }
166         }
167     }
168
169     void completeAll() {
170         completeRequests(entries.length);
171     }
172
173     int failAll(final OutboundQueueException cause) {
174         int ret = 0;
175         for (int i = lastBarrierOffset + 1; i < entries.length; ++i) {
176             final OutboundQueueEntry entry = entries[i];
177             if (!entry.isCommitted()) {
178                 break;
179             }
180
181             if (!entry.isCompleted()) {
182                 entry.fail(cause);
183                 ret++;
184             }
185         }
186
187         return ret;
188     }
189
190     boolean isComplete() {
191         return completeCount >= entries.length;
192     }
193
194     void recycle() {
195         for (final OutboundQueueEntry e : entries) {
196             e.reset();
197         }
198
199         CACHE.offer(new QueueRef(REF_QUEUE, entries));
200     }
201 }