/**
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
+ *
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
import org.slf4j.LoggerFactory;
/**
- * @param <IN>
- *
+ * @param <IN>
+ *
*/
public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
private static Logger LOG = LoggerFactory.getLogger(QueueKeeperHarvester.class);
- private Enqueuer<QueueItem<IN>> enqueuer;
- private Collection<QueueKeeper<IN>> messageSources;
+ private final Collection<QueueKeeper<IN>> messageSources;
+ private final Enqueuer<QueueItem<IN>> enqueuer;
+ private final Object harvestLock = new Object();
+ private volatile boolean finishing = false;
+ private volatile boolean wakeMe = false;
- private boolean finishing = false;
- private boolean starving;
-
- private Object harvestLock;
-
-
/**
* @param enqueuer
* @param messageSources
- * @param harvestLock
+ * @param harvestLock
*/
- public QueueKeeperHarvester(Enqueuer<QueueItem<IN>> enqueuer,
- Collection<QueueKeeper<IN>> messageSources) {
+ public QueueKeeperHarvester(final Enqueuer<QueueItem<IN>> enqueuer,
+ final Collection<QueueKeeper<IN>> messageSources) {
this.enqueuer = enqueuer;
this.messageSources = messageSources;
- harvestLock = new Object();
}
@Override
public void run() {
- while (! finishing ) {
- starving = true;
+ while (!finishing) {
+ boolean starving = true;
for (QueueKeeper<IN> source : messageSources) {
QueueItem<IN> qItem = source.poll();
if (qItem != null) {
enqueuer.enqueueQueueItem(qItem);
}
}
-
+
if (starving) {
+ LOG.trace("messageHarvester is about to make a starve sleep");
synchronized (harvestLock) {
+ wakeMe = true;
try {
- if (starving) {
- LOG.trace("messageHarvester is about to make a starve sleep");
- harvestLock.wait();
- LOG.trace("messageHarvester is waking up from a starve sleep");
- }
+ this.harvestLock.wait();
+ LOG.trace("messageHarvester is waking up from a starve sleep");
} catch (InterruptedException e) {
LOG.warn("message harvester has been interrupted during starve sleep", e);
+ } finally {
+ wakeMe = false;
}
}
}
}
}
-
+
/**
* finish harvester
*/
public void shutdown() {
this.finishing = true;
+ ping();
}
-
+
@Override
public void ping() {
- if (starving) {
- LOG.debug("pinging message harvester in starve status");
+ if (wakeMe) {
synchronized (harvestLock) {
- if (starving) {
- starving = false;
+ // Might've raced while waiting for lock, so need to recheck
+ if (wakeMe) {
+ LOG.debug("pinging message harvester in starve status");
harvestLock.notify();
}
}
}
}
-}
+}
\ No newline at end of file