2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowjava.protocol.impl.core.connection;
11 import com.google.common.base.Preconditions;
12 import io.netty.channel.Channel;
13 import java.util.ArrayList;
14 import java.util.Iterator;
15 import java.util.List;
16 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
17 import javax.annotation.Nonnull;
18 import javax.annotation.concurrent.GuardedBy;
19 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
20 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 abstract class AbstractStackedOutboundQueue implements OutboundQueue {
27 private static final Logger LOG = LoggerFactory.getLogger(AbstractStackedOutboundQueue.class);
29 protected static final AtomicLongFieldUpdater<AbstractStackedOutboundQueue> LAST_XID_UPDATER = AtomicLongFieldUpdater
30 .newUpdater(AbstractStackedOutboundQueue.class, "lastXid");
32 @GuardedBy("unflushedSegments")
33 protected volatile StackedSegment firstSegment;
34 @GuardedBy("unflushedSegments")
35 protected final List<StackedSegment> unflushedSegments = new ArrayList<>(2);
36 @GuardedBy("unflushedSegments")
37 protected final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
39 private volatile long lastXid = -1;
41 @GuardedBy("unflushedSegments")
42 protected Integer shutdownOffset;
44 // Accessed from Netty only
45 protected int flushOffset;
47 protected final AbstractOutboundQueueManager<?, ?> manager;
49 AbstractStackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
50 this.manager = Preconditions.checkNotNull(manager);
51 firstSegment = StackedSegment.create(0L);
52 uncompletedSegments.add(firstSegment);
53 unflushedSegments.add(firstSegment);
57 * Write some entries from the queue to the channel. Guaranteed to run
58 * in the corresponding EventLoop.
60 * @param channel Channel onto which we are writing
62 * @return Number of entries written out
64 abstract int writeEntries(@Nonnull final Channel channel, final long now);
66 abstract boolean pairRequest(final OfHeader message);
68 boolean needsFlush() {
69 // flushOffset always points to the first entry, which can be changed only
70 // from Netty, so we are fine here.
71 if (firstSegment.getBaseXid() + flushOffset > lastXid) {
75 if (shutdownOffset != null && flushOffset >= shutdownOffset) {
79 return firstSegment.getEntry(flushOffset).isCommitted();
82 long startShutdown(final Channel channel) {
84 * We are dealing with a multi-threaded shutdown, as the user may still
85 * be reserving entries in the queue. We are executing in a netty thread,
86 * so neither flush nor barrier can be running, which is good news.
87 * We will eat up all the slots in the queue here and mark the offset first
88 * reserved offset and free up all the cached queues. We then schedule
89 * the flush task, which will deal with the rest of the shutdown process.
91 synchronized (unflushedSegments) {
92 // Increment the offset by the segment size, preventing fast path allocations,
93 // since we are holding the slow path lock, any reservations will see the queue
94 // in shutdown and fail accordingly.
95 final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
96 shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
98 return lockedShutdownFlush();
102 boolean finishShutdown() {
103 synchronized (unflushedSegments) {
104 lockedShutdownFlush();
107 return !needsFlush();
110 @GuardedBy("unflushedSegments")
111 private long lockedShutdownFlush() {
115 final Iterator<StackedSegment> it = uncompletedSegments.iterator();
116 while (it.hasNext()) {
117 final StackedSegment segment = it.next();
119 entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
120 if (segment.isComplete()) {
121 LOG.trace("Cleared segment {}", segment);