BUG-985 - finisher queue is unbound 85/6885/4
authorJan Medved <jmedved@cisco.com>
Mon, 12 May 2014 05:39:50 +0000 (22:39 -0700)
committerMichal Rehak <mirehak@cisco.com>
Mon, 12 May 2014 17:28:49 +0000 (19:28 +0200)
and causes out of memory error under stress

- replaced queue.offer with queue.put (blocking behavior)

Changed the initialization of the TicketFinisher queue to limit the size of the queue to 1,000 elements. Backpressure will be exerted when the max queue size has been reached. This is to fix an out-of-memory condition that occurs under heavy load when the TicketFinisher queue grows w/o bounds.

Change-Id: I619d1c27db98fec95380bef2504add3025195abc
Signed-off-by: Jan Medved <jmedved@cisco.com>
Signed-off-by: Michal Rehak <mirehak@cisco.com>
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java

index 8b19cdffe0b4ff03de10000b469e7b7723b67738..78682d3f3907b86f3a41db70e3761479277cfd3e 100644 (file)
@@ -71,7 +71,7 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
      * prepare queue
      */
     public void init() {
-        processQueue = new LinkedBlockingQueue<>();
+        processQueue = new LinkedBlockingQueue<>(1000);
         pool = new ScheduledThreadPoolExecutor(poolSize);
 
         ticketProcessorFactory = new TicketProcessorFactory<>();
@@ -105,9 +105,12 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
             ticket.setMessage(message);
             LOG.debug("ticket scheduling: {}, ticket: {}",
                     message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
-            //TODO: block if queue limit reached
-            processQueue.add(ticket);
-            scheduleTicket(ticket);
+            try {
+                processQueue.put(ticket);
+                scheduleTicket(ticket);
+            } catch (InterruptedException e) {
+                LOG.warn("message enqueing interrupted", e);
+            }
         } else if (queueType == QueueKeeper.QueueType.UNORDERED){
             List<DataObject> processedMessages = translate(message,conductor);
             pop(processedMessages,conductor);