Bug 5740: Add Deque-based control-aware mailbox
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / common / actor / UnboundedDequeBasedControlAwareMailbox.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.common.actor;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSystem;
12 import akka.dispatch.ControlMessage;
13 import akka.dispatch.DequeBasedMessageQueueSemantics;
14 import akka.dispatch.Envelope;
15 import akka.dispatch.MailboxType;
16 import akka.dispatch.ProducesMessageQueue;
17 import akka.dispatch.UnboundedControlAwareMailbox;
18 import com.codahale.metrics.Gauge;
19 import com.typesafe.config.Config;
20 import java.util.Deque;
21 import java.util.Queue;
22 import java.util.concurrent.ConcurrentLinkedDeque;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.Option;
26
27 /**
28  * An unbounded ControlAwareMailbox that also supports DequeBasedMessageQueueSemantics so it can be used with
29  * persistent actors which use stashing.
30  *
31  * @author Thomas Pantelis
32  */
33 public class UnboundedDequeBasedControlAwareMailbox implements MailboxType,
34         ProducesMessageQueue<UnboundedDequeBasedControlAwareMailbox.MessageQueue> {
35     private static final Logger LOG = LoggerFactory.getLogger(UnboundedDequeBasedControlAwareMailbox.class);
36     private static final String NORMAL_QUEUE_SIZE = "normal-q-size";
37     private static final String CONTROL_QUEUE_SIZE = "control-q-size";
38     private static final String TOTAL_QUEUE_SIZE = "total-q-size";
39
40     public UnboundedDequeBasedControlAwareMailbox(ActorSystem.Settings settings, Config config) {
41     }
42
43     @Override
44     public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
45         LOG.debug("Creating MessageQueue for {}", owner);
46
47         final MessageQueue queue = new MessageQueue();
48
49         MeteredBoundedMailbox.registerMetric(owner, NORMAL_QUEUE_SIZE, (Gauge<Integer>) () -> queue.queue().size());
50         MeteredBoundedMailbox.registerMetric(owner, CONTROL_QUEUE_SIZE,
51             (Gauge<Integer>) () -> queue.controlQueue().size());
52         MeteredBoundedMailbox.registerMetric(owner, TOTAL_QUEUE_SIZE, (Gauge<Integer>) queue::numberOfMessages);
53
54         return queue;
55     }
56
57     static class MessageQueue extends UnboundedControlAwareMailbox.MessageQueue
58             implements DequeBasedMessageQueueSemantics {
59         private static final long serialVersionUID = 1L;
60
61         private final Deque<Envelope> controlQueue = new ConcurrentLinkedDeque<>();
62         private final Deque<Envelope> queue = new ConcurrentLinkedDeque<>();
63
64         @Override
65         public Queue<Envelope> controlQueue() {
66             return controlQueue;
67         }
68
69         @Override
70         public Queue<Envelope> queue() {
71             return queue;
72         }
73
74         @Override
75         public void enqueueFirst(ActorRef actor, Envelope envelope) {
76             final Object message = envelope.message();
77             LOG.debug("enqueueFirst: actor {}, message type: {}", actor, message.getClass());
78             if (message instanceof ControlMessage) {
79                 LOG.debug("Adding {} to the ControlMessage queue", message.getClass());
80                 controlQueue.addFirst(envelope);
81             } else {
82                 LOG.debug("Adding {} to the normal queue", message.getClass());
83                 queue.addFirst(envelope);
84             }
85         }
86     }
87 }