final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
- final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
+ final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, OutboundQueueCache.getInstance().getSlice(maxQueueDepth), maxBarrierNanos);
outputManager = ret;
channel.pipeline().addLast(outputManager);
--- /dev/null
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import java.util.HashMap;
+import java.util.Map;
+
+final class OutboundQueueCache {
+ private static final OutboundQueueCache INSTANCE = new OutboundQueueCache();
+
+ private final Map<Integer, OutboundQueueCacheSlice> slices = new HashMap<>();
+
+ private OutboundQueueCache() {
+
+ }
+
+ static OutboundQueueCache getInstance() {
+ return INSTANCE;
+ }
+
+ synchronized OutboundQueueCacheSlice getSlice(final int queueSize) {
+ final OutboundQueueCacheSlice oldSlice = slices.get(queueSize);
+ if (oldSlice != null) {
+ oldSlice.incRef();
+ return oldSlice;
+ }
+
+ final OutboundQueueCacheSlice newSlice = new OutboundQueueCacheSlice(queueSize);
+ slices.put(queueSize, newSlice);
+ return newSlice;
+ }
+
+ synchronized void putSlice(final OutboundQueueCacheSlice slice) {
+ if (slice.decRef()) {
+ slices.remove(slice.getQueueSize());
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.base.FinalizableReferenceQueue;
+import com.google.common.base.FinalizableSoftReference;
+import com.google.common.base.Preconditions;
+import java.lang.ref.Reference;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class OutboundQueueCacheSlice {
+ private static final class QueueRef extends FinalizableSoftReference<OutboundQueueImpl> {
+ private final Collection<?> cache;
+
+ protected QueueRef(final FinalizableReferenceQueue queue, final Collection<?> cache, final OutboundQueueImpl referent) {
+ super(referent, queue);
+ this.cache = Preconditions.checkNotNull(cache);
+ }
+
+ @Override
+ public void finalizeReferent() {
+ cache.remove(this);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueCacheSlice.class);
+ private final FinalizableReferenceQueue refQueue = new FinalizableReferenceQueue();
+
+ private final ConcurrentLinkedDeque<QueueRef> cache = new ConcurrentLinkedDeque<>();
+ private final int queueSize;
+ private int refCount = 1;
+
+ OutboundQueueCacheSlice(final int queueSize) {
+ Preconditions.checkArgument(queueSize >= 1);
+ this.queueSize = queueSize;
+ }
+
+ void remove(final QueueRef queueRef) {
+ cache.remove(queueRef);
+ }
+
+ boolean decRef() {
+ if (--refCount == 0) {
+ refQueue.close();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void incRef() {
+ refCount++;
+ }
+
+ int getQueueSize() {
+ return queueSize;
+ }
+
+ OutboundQueueImpl getQueue(final OutboundQueueManager<?> manager, final long baseXid) {
+ final OutboundQueueImpl ret;
+ OutboundQueueImpl cached = null;
+ for (;;) {
+ final Reference<OutboundQueueImpl> item = cache.pollLast();
+ if (item == null) {
+ break;
+ }
+
+ cached = item.get();
+ if (cached != null) {
+ ret = cached.reuse(manager, baseXid);
+ LOG.trace("Reusing queue {} as {} on manager {}", cached, ret, manager);
+ return ret;
+ }
+ }
+
+ ret = new OutboundQueueImpl(manager, baseXid, queueSize + 1);
+ LOG.trace("Allocated new queue {} on manager {}", ret, manager);
+ return ret;
+ }
+
+ void putQueue(final OutboundQueueImpl queue) {
+ queue.retire();
+ if (cache.offer(new QueueRef(refQueue, cache, queue))) {
+ LOG.trace("Saving queue {} for later reuse", queue);
+ } else {
+ LOG.trace("Queue {} thrown away", queue);
+ }
+ }
+}
}
}
- OutboundQueueImpl reuse(final long baseXid) {
+ OutboundQueueImpl reuse(final OutboundQueueManager<?> manager, final long baseXid) {
return new OutboundQueueImpl(manager, baseXid, queue);
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
*/
private static final int WORKTIME_RECHECK_MSGS = 64;
- /**
- * We maintain a cache of this many previous queues for later reuse.
- */
- private static final int QUEUE_CACHE_SIZE = 4;
-
- private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
private final AtomicBoolean flushScheduled = new AtomicBoolean();
private final ConnectionAdapterImpl parent;
private final InetSocketAddress address;
private final long maxBarrierNanos;
private final long maxWorkTime;
- private final int queueSize;
private final T handler;
// Updated from netty only
private long lastBarrierNanos = System.nanoTime();
+ private OutboundQueueCacheSlice slice;
private OutboundQueueImpl currentQueue;
private boolean barrierTimerEnabled;
private int nonBarrierMessages;
};
OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
- final int queueSize, final long maxBarrierNanos) {
+ final OutboundQueueCacheSlice slice, final long maxBarrierNanos) {
this.parent = Preconditions.checkNotNull(parent);
this.handler = Preconditions.checkNotNull(handler);
- Preconditions.checkArgument(queueSize > 0);
- this.queueSize = queueSize;
+ this.slice = Preconditions.checkNotNull(slice);
Preconditions.checkArgument(maxBarrierNanos > 0);
this.maxBarrierNanos = maxBarrierNanos;
this.address = address;
this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
- LOG.debug("Queue manager instantiated with queue size {}", queueSize);
+ LOG.debug("Queue manager instantiated with queue slice {}", slice);
createQueue();
}
@Override
public void close() {
handler.onConnectionQueueChanged(null);
- }
-
- private void retireQueue(final OutboundQueueImpl queue) {
- if (queueCache.offer(queue)) {
- queue.retire();
- LOG.trace("Saving queue {} for later reuse", queue);
- } else {
- LOG.trace("Queue {} thrown away", queue);
+ if (slice != null) {
+ slice.decRef();
+ slice = null;
}
}
private void createQueue() {
final long baseXid = lastXid;
- lastXid += queueSize + 1;
-
- final OutboundQueueImpl cached = queueCache.poll();
- final OutboundQueueImpl queue;
- if (cached != null) {
- queue = cached.reuse(baseXid);
- LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
- } else {
- queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
- LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel());
- }
+ lastXid += slice.getQueueSize() + 1;
+ final OutboundQueueImpl queue = slice.getQueue(this, baseXid);
activeQueues.add(queue);
currentQueue = queue;
handler.onConnectionQueueChanged(queue);
lastBarrierNanos = now;
} else {
nonBarrierMessages++;
- if (nonBarrierMessages >= queueSize) {
+ if (nonBarrierMessages >= slice.getQueueSize()) {
LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
scheduleBarrierMessage();
} else if (!barrierTimerEnabled) {
LOG.trace("Queue {} is implied finished", q);
q.completeAll();
it.remove();
- retireQueue(q);
+ slice.putQueue(q);
} else {
break;
}
if (queue.isFinished()) {
LOG.trace("Queue {} is finished", queue);
it.remove();
- retireQueue(queue);
+ slice.putQueue(queue);
}
return true;
* the flush task, which will deal with the rest of the shutdown process.
*/
shutdownOffset = currentQueue.startShutdown();
- queueCache.clear();
+ if (slice != null) {
+ slice.decRef();
+ slice = null;
+ }
+
LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
scheduleFlush();
}