2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowjava.protocol.impl.core.connection;
11 import com.google.common.util.concurrent.FutureCallback;
13 import io.netty.channel.Channel;
15 import java.util.function.Function;
17 import javax.annotation.Nonnull;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 * Class is designed for stacking Statistics and propagate immediate response for all
30 public class StackedOutboundQueueNoBarrier extends AbstractStackedOutboundQueue {
32 private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueueNoBarrier.class);
34 StackedOutboundQueueNoBarrier(final AbstractOutboundQueueManager<?, ?> manager) {
39 * This method is expected to be called from multiple threads concurrently
42 public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback,
43 final Function<OfHeader, Boolean> isCompletedFunction) {
44 final OutboundQueueEntry entry = getEntry(xid);
46 if (message instanceof FlowModInput) {
47 callback.onSuccess(null);
48 entry.commit(message, null, isCompletedFunction);
50 entry.commit(message, callback, isCompletedFunction);
53 LOG.trace("Queue {} committed XID {}", this, xid);
54 manager.ensureFlushing();
58 int writeEntries(@Nonnull final Channel channel, final long now) {
60 StackedSegment segment = firstSegment;
63 while (channel.isWritable()) {
64 final OutboundQueueEntry entry = segment.getEntry(flushOffset);
65 if (!entry.isCommitted()) {
66 LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid()
67 + flushOffset, segment, flushOffset);
71 LOG.trace("Queue {} flushing entry at offset {}", this, flushOffset);
72 final OfHeader message = entry.takeMessage();
76 if (message != null) {
77 manager.writeMessage(message, now);
82 if (flushOffset >= StackedSegment.SEGMENT_SIZE) {
84 * Slow path: purge the current segment unless it's the last one.
85 * If it is, we leave it for replacement when a new reservation
87 * This costs us two slow paths, but hey, this should be very rare,
88 * so let's keep things simple.
90 synchronized (unflushedSegments) {
91 LOG.debug("Flush offset {} unflushed segments {}", flushOffset, unflushedSegments.size());
93 // We may have raced ahead of reservation code and need to allocate a segment
94 ensureSegment(segment, flushOffset);
96 // Remove the segment, update the firstSegment and reset flushOffset
97 final StackedSegment oldSegment = unflushedSegments.remove(0);
98 oldSegment.completeAll();
99 uncompletedSegments.remove(oldSegment);
100 oldSegment.recycle();
102 // Reset the first segment and add it to the uncompleted list
103 segment = unflushedSegments.get(0);
104 uncompletedSegments.add(segment);
106 // Update the shutdown offset
107 if (shutdownOffset != null) {
108 shutdownOffset -= StackedSegment.SEGMENT_SIZE;
111 // Allow reservations back on the fast path by publishing the new first segment
112 firstSegment = segment;
115 LOG.debug("Queue {} flush moved to segment {}", this, segment);