X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowjava%2Fopenflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fcore%2Fconnection%2FAbstractStackedOutboundQueue.java;h=8f9e5e47090e67e9164a68189888ba993cec116f;hb=a425274a36f7ea227ba3ec7181ee646b5fa50d40;hp=b4356ee41ef03d88bcea34a5db87137fab95d8a5;hpb=34a07eabaeccb03d834359b99694f79b89e37583;p=openflowplugin.git diff --git a/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java b/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java index b4356ee41e..8f9e5e4709 100644 --- a/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java +++ b/openflowjava/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java @@ -5,37 +5,32 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowjava.protocol.impl.core.connection; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; -import com.google.common.util.concurrent.FutureCallback; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; +import com.google.common.util.concurrent.FutureCallback; import io.netty.channel.Channel; - import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.function.Function; - -import javax.annotation.Nonnull; -import javax.annotation.concurrent.GuardedBy; - +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; abstract class AbstractStackedOutboundQueue implements OutboundQueue { private static final Logger LOG = LoggerFactory.getLogger(AbstractStackedOutboundQueue.class); - protected static final AtomicLongFieldUpdater LAST_XID_OFFSET_UPDATER = AtomicLongFieldUpdater - .newUpdater(AbstractStackedOutboundQueue.class, "lastXid"); + protected static final AtomicLongFieldUpdater LAST_XID_OFFSET_UPDATER = + AtomicLongFieldUpdater.newUpdater(AbstractStackedOutboundQueue.class, "lastXid"); @GuardedBy("unflushedSegments") protected volatile StackedSegment firstSegment; @@ -56,7 +51,7 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { protected final AbstractOutboundQueueManager manager; AbstractStackedOutboundQueue(final AbstractOutboundQueueManager manager) { - this.manager = Preconditions.checkNotNull(manager); + this.manager = requireNonNull(manager); firstSegment = StackedSegment.create(0L); uncompletedSegments.add(firstSegment); unflushedSegments.add(firstSegment); @@ -67,13 +62,15 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { commitEntry(xid, message, callback, OutboundQueueEntry.DEFAULT_IS_COMPLETE); } - @GuardedBy("unflushedSegments") + @Holding("unflushedSegments") protected void ensureSegment(final StackedSegment first, final int offset) { final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE; - LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset, unflushedSegments.size()); + LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset, + unflushedSegments.size()); for (int i = unflushedSegments.size(); i <= segmentOffset; ++i) { - final StackedSegment newSegment = StackedSegment.create(first.getBaseXid() + (StackedSegment.SEGMENT_SIZE * i)); + final StackedSegment newSegment = StackedSegment.create(first.getBaseXid() + + StackedSegment.SEGMENT_SIZE * (long)i); LOG.debug("Adding segment {}", newSegment); unflushedSegments.add(newSegment); } @@ -106,7 +103,7 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { // Ensure we have the appropriate segment for the specified XID final StackedSegment slowSegment = firstSegment; final int slowOffset = (int) (xid - slowSegment.getBaseXid()); - Verify.verify(slowOffset >= 0); + verify(slowOffset >= 0); // Now, we let's see if we need to allocate a new segment ensureSegment(slowSegment, slowOffset); @@ -127,10 +124,10 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { * in the corresponding EventLoop. * * @param channel Channel onto which we are writing - * @param now + * @param now time stamp * @return Number of entries written out */ - int writeEntries(@Nonnull final Channel channel, final long now) { + int writeEntries(@NonNull final Channel channel, final long now) { // Local cache StackedSegment segment = firstSegment; int entries = 0; @@ -138,7 +135,8 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { while (channel.isWritable()) { final OutboundQueueEntry entry = segment.getEntry(flushOffset); if (!entry.isCommitted()) { - LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid() + flushOffset, segment, flushOffset); + LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid() + + flushOffset, segment, flushOffset); break; } @@ -279,7 +277,8 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { /** * Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed - * and fails all not completed entries (if in final phase) + * and fails all not completed entries (if in final phase). + * * @param channel netty channel * @return true if in final phase, false if a flush is needed */ @@ -303,9 +302,10 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { protected OutboundQueueEntry getEntry(final Long xid) { final StackedSegment fastSegment = firstSegment; final long calcOffset = xid - fastSegment.getBaseXid(); - Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid()); + checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", + xid, fastSegment.getBaseXid()); - Verify.verify(calcOffset <= Integer.MAX_VALUE); + verify(calcOffset <= Integer.MAX_VALUE); final int fastOffset = (int) calcOffset; if (fastOffset >= StackedSegment.SEGMENT_SIZE) { @@ -316,7 +316,7 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { synchronized (unflushedSegments) { final StackedSegment slowSegment = firstSegment; final long slowCalcOffset = xid - slowSegment.getBaseXid(); - Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE); + verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE); slowOffset = (int) slowCalcOffset; LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset); @@ -324,14 +324,16 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { } final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE; - LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset); + LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, + xid, slowOffset, segment, segOffset); return segment.getEntry(segOffset); } return fastSegment.getEntry(fastOffset); } /** - * Fails not completed entries in segments and frees completed segments + * Fails not completed entries in segments and frees completed segments. + * * @param iterator list of segments to be failed * @return number of failed entries */ @@ -352,5 +354,4 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { return entries; } - }