import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
-public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
+public class MeteredBoundedMailbox implements MailboxType,
+ ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
+ private static final Logger LOG = LoggerFactory.getLogger(MeteredBoundedMailbox.class);
+ private static final String QUEUE_SIZE = "q-size";
- private final Logger LOG = LoggerFactory.getLogger(MeteredBoundedMailbox.class);
-
- private MeteredMessageQueue queue;
- private Integer capacity;
- private FiniteDuration pushTimeOut;
- private MetricRegistry registry;
-
- private final String QUEUE_SIZE = "q-size";
+ private final Integer capacity;
+ private final FiniteDuration pushTimeOut;
+ private final MetricRegistry registry;
public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
this.capacity = commonConfig.getMailBoxCapacity();
this.pushTimeOut = commonConfig.getMailBoxPushTimeout();
- MetricsReporter reporter = MetricsReporter.getInstance();
+ MetricsReporter reporter = MetricsReporter.getInstance(MeteringBehavior.DOMAIN);
registry = reporter.getMetricsRegistry();
}
@Override
public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
- this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
- monitorQueueSize(owner, this.queue);
- return this.queue;
+ final MeteredMessageQueue queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
+ monitorQueueSize(owner, queue);
+ return queue;
}
private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
String actorName = owner.get().path().toStringWithoutAddress();
String metricName = MetricRegistry.name(actorName, QUEUE_SIZE);
- if (registry.getMetrics().containsKey(metricName))
+ if (registry.getMetrics().containsKey(metricName)) {
return; //already registered
+ }
- Gauge queueSize = getQueueSizeGuage(monitoredQueue);
+ Gauge<Integer> queueSize = getQueueSizeGuage(monitoredQueue);
registerQueueSizeMetric(metricName, queueSize);
}
public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
+ private static final long serialVersionUID = 1L;
public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
super(capacity, pushTimeOut);
}
}
- private Gauge getQueueSizeGuage(final MeteredMessageQueue monitoredQueue ){
- return new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return monitoredQueue.size();
- }
- };
+ private static Gauge<Integer> getQueueSizeGuage(final MeteredMessageQueue monitoredQueue) {
+ return () -> monitoredQueue.size();
}
- private void registerQueueSizeMetric(String metricName, Gauge metric){
+ private void registerQueueSizeMetric(String metricName, Gauge<Integer> metric) {
try {
registry.register(metricName,metric);
} catch (IllegalArgumentException e) {