* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.openflowjava.protocol.impl.core.connection;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
-import com.google.common.util.concurrent.FutureCallback;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+import com.google.common.util.concurrent.FutureCallback;
import io.netty.channel.Channel;
-
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.function.Function;
-
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.GuardedBy;
-
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class AbstractStackedOutboundQueue implements OutboundQueue {
private static final Logger LOG = LoggerFactory.getLogger(AbstractStackedOutboundQueue.class);
- protected static final AtomicLongFieldUpdater<AbstractStackedOutboundQueue> LAST_XID_OFFSET_UPDATER = AtomicLongFieldUpdater
- .newUpdater(AbstractStackedOutboundQueue.class, "lastXid");
+ protected static final AtomicLongFieldUpdater<AbstractStackedOutboundQueue> LAST_XID_OFFSET_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(AbstractStackedOutboundQueue.class, "lastXid");
@GuardedBy("unflushedSegments")
protected volatile StackedSegment firstSegment;
protected final AbstractOutboundQueueManager<?, ?> manager;
AbstractStackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
- this.manager = Preconditions.checkNotNull(manager);
+ this.manager = requireNonNull(manager);
firstSegment = StackedSegment.create(0L);
uncompletedSegments.add(firstSegment);
unflushedSegments.add(firstSegment);
commitEntry(xid, message, callback, OutboundQueueEntry.DEFAULT_IS_COMPLETE);
}
- @GuardedBy("unflushedSegments")
+ @Holding("unflushedSegments")
protected void ensureSegment(final StackedSegment first, final int offset) {
final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE;
- LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset, unflushedSegments.size());
+ LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset,
+ unflushedSegments.size());
for (int i = unflushedSegments.size(); i <= segmentOffset; ++i) {
- final StackedSegment newSegment = StackedSegment.create(first.getBaseXid() + (StackedSegment.SEGMENT_SIZE * i));
+ final StackedSegment newSegment = StackedSegment.create(first.getBaseXid()
+ + StackedSegment.SEGMENT_SIZE * (long)i);
LOG.debug("Adding segment {}", newSegment);
unflushedSegments.add(newSegment);
}
// Ensure we have the appropriate segment for the specified XID
final StackedSegment slowSegment = firstSegment;
final int slowOffset = (int) (xid - slowSegment.getBaseXid());
- Verify.verify(slowOffset >= 0);
+ verify(slowOffset >= 0);
// Now, we let's see if we need to allocate a new segment
ensureSegment(slowSegment, slowOffset);
* in the corresponding EventLoop.
*
* @param channel Channel onto which we are writing
- * @param now
+ * @param now time stamp
* @return Number of entries written out
*/
- int writeEntries(@Nonnull final Channel channel, final long now) {
+ int writeEntries(@NonNull final Channel channel, final long now) {
// Local cache
StackedSegment segment = firstSegment;
int entries = 0;
while (channel.isWritable()) {
final OutboundQueueEntry entry = segment.getEntry(flushOffset);
if (!entry.isCommitted()) {
- LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid() + flushOffset, segment, flushOffset);
+ LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid()
+ + flushOffset, segment, flushOffset);
break;
}
/**
* Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed
- * and fails all not completed entries (if in final phase)
+ * and fails all not completed entries (if in final phase).
+ *
* @param channel netty channel
* @return true if in final phase, false if a flush is needed
*/
protected OutboundQueueEntry getEntry(final Long xid) {
final StackedSegment fastSegment = firstSegment;
final long calcOffset = xid - fastSegment.getBaseXid();
- Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid());
+ checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s",
+ xid, fastSegment.getBaseXid());
- Verify.verify(calcOffset <= Integer.MAX_VALUE);
+ verify(calcOffset <= Integer.MAX_VALUE);
final int fastOffset = (int) calcOffset;
if (fastOffset >= StackedSegment.SEGMENT_SIZE) {
synchronized (unflushedSegments) {
final StackedSegment slowSegment = firstSegment;
final long slowCalcOffset = xid - slowSegment.getBaseXid();
- Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE);
+ verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE);
slowOffset = (int) slowCalcOffset;
LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset);
}
final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE;
- LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset);
+ LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this,
+ xid, slowOffset, segment, segOffset);
return segment.getEntry(segOffset);
}
return fastSegment.getEntry(fastOffset);
}
/**
- * Fails not completed entries in segments and frees completed segments
+ * Fails not completed entries in segments and frees completed segments.
+ *
* @param iterator list of segments to be failed
* @return number of failed entries
*/
return entries;
}
-
}