import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.dispatch.BoundedMailbox;
+import akka.dispatch.BoundedDequeBasedMailbox;
import akka.dispatch.MailboxType;
-import akka.dispatch.MessageQueue;
import akka.dispatch.ProducesMessageQueue;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import java.util.concurrent.TimeUnit;
-public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<BoundedMailbox.MessageQueue> {
+public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
private MeteredMessageQueue queue;
private Integer capacity;
private MetricsReporter reporter;
private final String QUEUE_SIZE = "queue-size";
+ private final String CAPACITY = "mailbox-capacity";
+ private final String TIMEOUT = "mailbox-push-timeout-time";
private final Long DEFAULT_TIMEOUT = 10L;
public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
Preconditions.checkArgument( config.hasPath("mailbox-capacity"), "Missing configuration [mailbox-capacity]" );
- this.capacity = config.getInt("mailbox-capacity");
+ this.capacity = config.getInt(CAPACITY);
Preconditions.checkArgument( this.capacity > 0, "mailbox-capacity must be > 0");
Long timeout = -1L;
- if ( config.hasPath("mailbox-push-timeout-time") ){
- timeout = config.getDuration("mailbox-push-timeout-time", TimeUnit.NANOSECONDS);
+ if ( config.hasPath(TIMEOUT) ){
+ timeout = config.getDuration(TIMEOUT, TimeUnit.NANOSECONDS);
} else {
timeout = DEFAULT_TIMEOUT;
}
@Override
- public MessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
+ public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
monitorQueueSize(owner, this.queue);
return this.queue;
return; //there's no actor to monitor
}
actorPath = owner.get().path();
- MetricRegistry registry = reporter.getMetricsRegistry();
+ String actorInstanceId = Integer.toString(owner.get().hashCode());
- String actorName = registry.name(actorPath.toString(), QUEUE_SIZE);
+ MetricRegistry registry = reporter.getMetricsRegistry();
+ String actorName = registry.name(actorPath.toString(), actorInstanceId, QUEUE_SIZE);
if (registry.getMetrics().containsKey(actorName))
return; //already registered
- reporter.getMetricsRegistry().register(actorName,
+ registry.register(actorName,
new Gauge<Integer>() {
@Override
public Integer getValue() {
}
- public static class MeteredMessageQueue extends BoundedMailbox.MessageQueue {
+ public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
super(capacity, pushTimeOut);