BUG-3219: implement a shared stash for queue reuse 40/20740/5
authorRobert Varga <rovarga@cisco.com>
Tue, 19 May 2015 15:14:06 +0000 (17:14 +0200)
committerRobert Varga <rovarga@cisco.com>
Thu, 21 May 2015 12:59:15 +0000 (14:59 +0200)
This introduces a global per-size stash for reuse of OutbountQueueImpl
objects. This allows for efficient reuse across multiple sessions, which
means we minimize stale memory use when some channels are idle.

The stash is implemented in terms of a ConcurrentLinkedDeque, which is
flushed from the tail and has soft references. We rely on
FinalizableReference to notify us when a particular queue is finished.

Change-Id: I78e654a0c3470bbd1c9274237b2be04f50459809
Signed-off-by: Robert Varga <rovarga@cisco.com>
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueCache.java [new file with mode: 0644]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueCacheSlice.java [new file with mode: 0644]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java

index cf2693c80276a4c587b3bc0f45d47af17d8078e5..ded62f57bd41844bc0e86bb99100dfc72084eb6a 100644 (file)
@@ -508,7 +508,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
             final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
         Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
 
-        final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
+        final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, OutboundQueueCache.getInstance().getSlice(maxQueueDepth), maxBarrierNanos);
         outputManager = ret;
         channel.pipeline().addLast(outputManager);
 
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueCache.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueCache.java
new file mode 100644 (file)
index 0000000..f65c5a8
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import java.util.HashMap;
+import java.util.Map;
+
+final class OutboundQueueCache {
+    private static final OutboundQueueCache INSTANCE = new OutboundQueueCache();
+
+    private final Map<Integer, OutboundQueueCacheSlice> slices = new HashMap<>();
+
+    private OutboundQueueCache() {
+
+    }
+
+    static OutboundQueueCache getInstance() {
+        return INSTANCE;
+    }
+
+    synchronized OutboundQueueCacheSlice getSlice(final int queueSize) {
+        final OutboundQueueCacheSlice oldSlice = slices.get(queueSize);
+        if (oldSlice != null) {
+            oldSlice.incRef();
+            return oldSlice;
+        }
+
+        final OutboundQueueCacheSlice newSlice = new OutboundQueueCacheSlice(queueSize);
+        slices.put(queueSize, newSlice);
+        return newSlice;
+    }
+
+    synchronized void putSlice(final OutboundQueueCacheSlice slice) {
+        if (slice.decRef()) {
+            slices.remove(slice.getQueueSize());
+        }
+    }
+}
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueCacheSlice.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueCacheSlice.java
new file mode 100644 (file)
index 0000000..a514589
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.base.FinalizableReferenceQueue;
+import com.google.common.base.FinalizableSoftReference;
+import com.google.common.base.Preconditions;
+import java.lang.ref.Reference;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class OutboundQueueCacheSlice {
+    private static final class QueueRef extends FinalizableSoftReference<OutboundQueueImpl> {
+        private final Collection<?> cache;
+
+        protected QueueRef(final FinalizableReferenceQueue queue, final Collection<?> cache, final OutboundQueueImpl referent) {
+            super(referent, queue);
+            this.cache = Preconditions.checkNotNull(cache);
+        }
+
+        @Override
+        public void finalizeReferent() {
+            cache.remove(this);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueCacheSlice.class);
+    private final FinalizableReferenceQueue refQueue = new FinalizableReferenceQueue();
+
+    private final ConcurrentLinkedDeque<QueueRef> cache = new ConcurrentLinkedDeque<>();
+    private final int queueSize;
+    private int refCount = 1;
+
+    OutboundQueueCacheSlice(final int queueSize) {
+        Preconditions.checkArgument(queueSize >= 1);
+        this.queueSize = queueSize;
+    }
+
+    void remove(final QueueRef queueRef) {
+        cache.remove(queueRef);
+    }
+
+    boolean decRef() {
+        if (--refCount == 0) {
+            refQueue.close();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    void incRef() {
+        refCount++;
+    }
+
+    int getQueueSize() {
+        return queueSize;
+    }
+
+    OutboundQueueImpl getQueue(final OutboundQueueManager<?> manager, final long baseXid) {
+        final OutboundQueueImpl ret;
+        OutboundQueueImpl cached = null;
+        for (;;) {
+            final Reference<OutboundQueueImpl> item = cache.pollLast();
+            if (item == null) {
+                break;
+            }
+
+            cached = item.get();
+            if (cached != null) {
+                ret = cached.reuse(manager, baseXid);
+                LOG.trace("Reusing queue {} as {} on manager {}", cached, ret, manager);
+                return ret;
+            }
+        }
+
+        ret = new OutboundQueueImpl(manager, baseXid, queueSize + 1);
+        LOG.trace("Allocated new queue {} on manager {}", ret, manager);
+        return ret;
+    }
+
+    void putQueue(final OutboundQueueImpl queue) {
+        queue.retire();
+        if (cache.offer(new QueueRef(refQueue, cache, queue))) {
+            LOG.trace("Saving queue {} for later reuse", queue);
+        } else {
+            LOG.trace("Queue {} thrown away", queue);
+        }
+    }
+}
index 2433637f1bbc72148c89dafec2d38965f76d6759..5698eeec902382690f1cf45ef1f2b79a895e4535 100644 (file)
@@ -74,7 +74,7 @@ final class OutboundQueueImpl implements OutboundQueue {
         }
     }
 
-    OutboundQueueImpl reuse(final long baseXid) {
+    OutboundQueueImpl reuse(final OutboundQueueManager<?> manager, final long baseXid) {
         return new OutboundQueueImpl(manager, baseXid, queue);
     }
 
index f2a4b0e551b9e84162aad8118a2651e11984a5d5..948424f91cfb063d96cf3c95ebf3cc65205d6a75 100644 (file)
@@ -11,7 +11,6 @@ import com.google.common.base.Preconditions;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
@@ -44,23 +43,17 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      */
     private static final int WORKTIME_RECHECK_MSGS = 64;
 
-    /**
-     * We maintain a cache of this many previous queues for later reuse.
-     */
-    private static final int QUEUE_CACHE_SIZE = 4;
-
-    private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
     private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
     private final AtomicBoolean flushScheduled = new AtomicBoolean();
     private final ConnectionAdapterImpl parent;
     private final InetSocketAddress address;
     private final long maxBarrierNanos;
     private final long maxWorkTime;
-    private final int queueSize;
     private final T handler;
 
     // Updated from netty only
     private long lastBarrierNanos = System.nanoTime();
+    private OutboundQueueCacheSlice slice;
     private OutboundQueueImpl currentQueue;
     private boolean barrierTimerEnabled;
     private int nonBarrierMessages;
@@ -84,17 +77,16 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     };
 
     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
-        final int queueSize, final long maxBarrierNanos) {
+        final OutboundQueueCacheSlice slice, final long maxBarrierNanos) {
         this.parent = Preconditions.checkNotNull(parent);
         this.handler = Preconditions.checkNotNull(handler);
-        Preconditions.checkArgument(queueSize > 0);
-        this.queueSize = queueSize;
+        this.slice = Preconditions.checkNotNull(slice);
         Preconditions.checkArgument(maxBarrierNanos > 0);
         this.maxBarrierNanos = maxBarrierNanos;
         this.address = address;
         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
 
-        LOG.debug("Queue manager instantiated with queue size {}", queueSize);
+        LOG.debug("Queue manager instantiated with queue slice {}", slice);
         createQueue();
     }
 
@@ -105,31 +97,17 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     @Override
     public void close() {
         handler.onConnectionQueueChanged(null);
-    }
-
-    private void retireQueue(final OutboundQueueImpl queue) {
-        if (queueCache.offer(queue)) {
-            queue.retire();
-            LOG.trace("Saving queue {} for later reuse", queue);
-        } else {
-            LOG.trace("Queue {} thrown away", queue);
+        if (slice != null) {
+            slice.decRef();
+            slice = null;
         }
     }
 
     private void createQueue() {
         final long baseXid = lastXid;
-        lastXid += queueSize + 1;
-
-        final OutboundQueueImpl cached = queueCache.poll();
-        final OutboundQueueImpl queue;
-        if (cached != null) {
-            queue = cached.reuse(baseXid);
-            LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
-        } else {
-            queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
-            LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel());
-        }
+        lastXid += slice.getQueueSize() + 1;
 
+        final OutboundQueueImpl queue = slice.getQueue(this, baseXid);
         activeQueues.add(queue);
         currentQueue = queue;
         handler.onConnectionQueueChanged(queue);
@@ -188,7 +166,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             lastBarrierNanos = now;
         } else {
             nonBarrierMessages++;
-            if (nonBarrierMessages >= queueSize) {
+            if (nonBarrierMessages >= slice.getQueueSize()) {
                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
                 scheduleBarrierMessage();
             } else if (!barrierTimerEnabled) {
@@ -235,7 +213,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
                         LOG.trace("Queue {} is implied finished", q);
                         q.completeAll();
                         it.remove();
-                        retireQueue(q);
+                        slice.putQueue(q);
                     } else {
                         break;
                     }
@@ -245,7 +223,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             if (queue.isFinished()) {
                 LOG.trace("Queue {} is finished", queue);
                 it.remove();
-                retireQueue(queue);
+                slice.putQueue(queue);
             }
 
             return true;
@@ -445,7 +423,11 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
          * the flush task, which will deal with the rest of the shutdown process.
          */
         shutdownOffset = currentQueue.startShutdown();
-        queueCache.clear();
+        if (slice != null) {
+            slice.decRef();
+            slice = null;
+        }
+
         LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
         scheduleFlush();
     }