From 2a893dae70973aef528849271e35fb28bda13d19 Mon Sep 17 00:00:00 2001 From: Jan Medved Date: Sun, 11 May 2014 22:39:50 -0700 Subject: [PATCH] BUG-985 - finisher queue is unbound and causes out of memory error under stress - replaced queue.offer with queue.put (blocking behavior) Changed the initialization of the TicketFinisher queue to limit the size of the queue to 1,000 elements. Backpressure will be exerted when the max queue size has been reached. This is to fix an out-of-memory condition that occurs under heavy load when the TicketFinisher queue grows w/o bounds. Change-Id: I619d1c27db98fec95380bef2504add3025195abc Signed-off-by: Jan Medved Signed-off-by: Michal Rehak --- .../openflow/md/queue/QueueKeeperLightImpl.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java index 8b19cdffe0..78682d3f39 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java @@ -71,7 +71,7 @@ public class QueueKeeperLightImpl implements QueueKeeper { * prepare queue */ public void init() { - processQueue = new LinkedBlockingQueue<>(); + processQueue = new LinkedBlockingQueue<>(1000); pool = new ScheduledThreadPoolExecutor(poolSize); ticketProcessorFactory = new TicketProcessorFactory<>(); @@ -105,9 +105,12 @@ public class QueueKeeperLightImpl implements QueueKeeper { ticket.setMessage(message); LOG.debug("ticket scheduling: {}, ticket: {}", message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket)); - //TODO: block if queue limit reached - processQueue.add(ticket); - scheduleTicket(ticket); + try { + processQueue.put(ticket); + scheduleTicket(ticket); + } catch (InterruptedException e) { + LOG.warn("message enqueing interrupted", e); + } } else if (queueType == QueueKeeper.QueueType.UNORDERED){ List processedMessages = translate(message,conductor); pop(processedMessages,conductor); -- 2.36.6