/** * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.openflowplugin.openflow.md.queue; import java.util.Collection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @param * */ public class QueueKeeperHarvester implements Runnable, HarvesterHandle { private static Logger LOG = LoggerFactory.getLogger(QueueKeeperHarvester.class); private Enqueuer> enqueuer; private Collection> messageSources; private boolean finishing = false; private boolean starving; private Object harvestLock; /** * @param enqueuer * @param messageSources * @param harvestLock */ public QueueKeeperHarvester(Enqueuer> enqueuer, Collection> messageSources) { this.enqueuer = enqueuer; this.messageSources = messageSources; harvestLock = new Object(); } @Override public void run() { while (! finishing ) { starving = true; for (QueueKeeper source : messageSources) { QueueItem qItem = source.poll(); if (qItem != null) { starving = false; enqueuer.enqueueQueueItem(qItem); } } if (starving) { synchronized (harvestLock) { try { if (starving) { LOG.trace("messageHarvester is about to make a starve sleep"); harvestLock.wait(); LOG.trace("messageHarvester is waking up from a starve sleep"); } } catch (InterruptedException e) { LOG.warn("message harvester has been interrupted during starve sleep", e); } } } } } /** * finish harvester */ public void shutdown() { this.finishing = true; } @Override public void ping() { if (starving) { LOG.debug("pinging message harvester in starve status"); synchronized (harvestLock) { if (starving) { starving = false; harvestLock.notify(); } } } } }