There were couple of things wrong with this code: flipping 'starving'
field needlessly, which could end up in needless calls to notify().
Split that field into local state and volatile status indicator.
finished is not volatile, so the checks can actually get removed at
runtime, breaking the shutdown() functionality. Also an explicit ping()
is needed to wake a starving harvester.
Change-Id: I58b46a3f2e593e3873c47c30d2fe0d22d4304e8f
Signed-off-by: Robert Varga <rovarga@cisco.com>
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
/**
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
/**
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
import org.slf4j.LoggerFactory;
/**
import org.slf4j.LoggerFactory;
/**
*/
public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
private static Logger LOG = LoggerFactory.getLogger(QueueKeeperHarvester.class);
*/
public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
private static Logger LOG = LoggerFactory.getLogger(QueueKeeperHarvester.class);
- private Enqueuer<QueueItem<IN>> enqueuer;
- private Collection<QueueKeeper<IN>> messageSources;
+ private final Collection<QueueKeeper<IN>> messageSources;
+ private final Enqueuer<QueueItem<IN>> enqueuer;
+ private final Object harvestLock = new Object();
+ private volatile boolean finishing = false;
+ private volatile boolean wakeMe = false;
- private boolean finishing = false;
- private boolean starving;
-
- private Object harvestLock;
-
-
/**
* @param enqueuer
* @param messageSources
/**
* @param enqueuer
* @param messageSources
- public QueueKeeperHarvester(Enqueuer<QueueItem<IN>> enqueuer,
- Collection<QueueKeeper<IN>> messageSources) {
+ public QueueKeeperHarvester(final Enqueuer<QueueItem<IN>> enqueuer,
+ final Collection<QueueKeeper<IN>> messageSources) {
this.enqueuer = enqueuer;
this.messageSources = messageSources;
this.enqueuer = enqueuer;
this.messageSources = messageSources;
- harvestLock = new Object();
}
@Override
public void run() {
}
@Override
public void run() {
- while (! finishing ) {
- starving = true;
+ while (!finishing) {
+ boolean starving = true;
for (QueueKeeper<IN> source : messageSources) {
QueueItem<IN> qItem = source.poll();
if (qItem != null) {
for (QueueKeeper<IN> source : messageSources) {
QueueItem<IN> qItem = source.poll();
if (qItem != null) {
enqueuer.enqueueQueueItem(qItem);
}
}
enqueuer.enqueueQueueItem(qItem);
}
}
+ LOG.trace("messageHarvester is about to make a starve sleep");
synchronized (harvestLock) {
synchronized (harvestLock) {
- if (starving) {
- LOG.trace("messageHarvester is about to make a starve sleep");
- harvestLock.wait();
- LOG.trace("messageHarvester is waking up from a starve sleep");
- }
+ this.harvestLock.wait();
+ LOG.trace("messageHarvester is waking up from a starve sleep");
} catch (InterruptedException e) {
LOG.warn("message harvester has been interrupted during starve sleep", e);
} catch (InterruptedException e) {
LOG.warn("message harvester has been interrupted during starve sleep", e);
+ } finally {
+ wakeMe = false;
/**
* finish harvester
*/
public void shutdown() {
this.finishing = true;
/**
* finish harvester
*/
public void shutdown() {
this.finishing = true;
@Override
public void ping() {
@Override
public void ping() {
- if (starving) {
- LOG.debug("pinging message harvester in starve status");
synchronized (harvestLock) {
synchronized (harvestLock) {
- if (starving) {
- starving = false;
+ // Might've raced while waiting for lock, so need to recheck
+ if (wakeMe) {
+ LOG.debug("pinging message harvester in starve status");
harvestLock.notify();
}
}
}
}
harvestLock.notify();
}
}
}
}
+}
\ No newline at end of file