</dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-osgi_${scala.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor_${scala.version}</artifactId>
+ </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
--- /dev/null
+package org.opendaylight.controller.cluster.common.actor;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class AbstractConfig implements UnifiedConfig {
+
+ private Config config;
+
+ public AbstractConfig(Config config){
+ this.config = config;
+ }
+
+ @Override
+ public Config get() {
+ return config;
+ }
+
+ public static abstract class Builder<T extends Builder>{
+ protected Map<String, Object> configHolder;
+ protected Config fallback;
+
+ private final String actorSystemName;
+
+ public Builder(String actorSystemName){
+ Preconditions.checkArgument(actorSystemName != null, "Actor system name must not be null");
+ this.actorSystemName = actorSystemName;
+ configHolder = new HashMap<>();
+ }
+
+ public T withConfigReader(AkkaConfigurationReader reader){
+ fallback = reader.read().getConfig(actorSystemName);
+ return (T)this;
+ }
+
+ protected Config merge(){
+ if (fallback == null)
+ fallback = ConfigFactory.load().getConfig(actorSystemName);
+
+ return ConfigFactory.parseMap(configHolder).withFallback(fallback);
+ }
+ }
+}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.cluster.datastore;
+package org.opendaylight.controller.cluster.common.actor;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.datastore.messages.Monitor;
public abstract class AbstractUntypedActor extends UntypedActor {
protected final LoggingAdapter LOG =
Logging.getLogger(getContext().system(), this);
-
public AbstractUntypedActor() {
LOG.debug("Actor created {}", getSelf());
getContext().
system().
actorSelection("user/termination-monitor").
tell(new Monitor(getSelf()), getSelf());
+
}
@Override public void onReceive(Object message) throws Exception {
- LOG.debug("Received message {}", message.getClass().getSimpleName());
+ final String messageType = message.getClass().getSimpleName();
+ LOG.debug("Received message {}", messageType);
+
handleReceive(message);
- LOG.debug("Done handling message {}",
- message.getClass().getSimpleName());
+
+ LOG.debug("Done handling message {}", messageType);
}
protected abstract void handleReceive(Object message) throws Exception;
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+/**
+ * Actor with its behaviour metered. Metering is enabled by configuration.
+ */
+public abstract class AbstractUntypedActorWithMetering extends AbstractUntypedActor {
+
+ public AbstractUntypedActorWithMetering() {
+ if (isMetricsCaptureEnabled())
+ getContext().become(new MeteringBehavior(this));
+ }
+
+ private boolean isMetricsCaptureEnabled(){
+ CommonConfig config = new CommonConfig(getContext().system().settings().config());
+ return config.isMetricCaptureEnabled();
+ }
+}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.remote.rpc.utils;
+package org.opendaylight.controller.cluster.common.actor;
import com.typesafe.config.Config;
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class CommonConfig extends AbstractConfig {
+
+ protected static final String TAG_ACTOR_SYSTEM_NAME = "actor-system-name";
+ protected static final String TAG_METRIC_CAPTURE_ENABLED = "metric-capture-enabled";
+ protected static final String TAG_MAILBOX_CAPACITY = "mailbox-capacity";
+ protected static final String TAG_MAILBOX = "bounded-mailbox";
+ protected static final String TAG_MAILBOX_PUSH_TIMEOUT = "mailbox-push-timeout-time";
+
+ //TODO: Ideally these defaults should go to reference.conf
+ // https://bugs.opendaylight.org/show_bug.cgi?id=1709
+ private static final int DEFAULT_MAILBOX_CAPACITY = 1000;
+ private static final int DEFAULT_MAILBOX_PUSH_TIMEOUT = 100;
+
+ //locally cached values
+ private FiniteDuration cachedMailBoxPushTimeout;
+ private Integer cachedMailBoxCapacity;
+ private Boolean cachedMetricCaptureEnableFlag;
+
+ public CommonConfig(Config config) {
+ super(config);
+ }
+
+ public String getActorSystemName() {
+ return get().getString(TAG_ACTOR_SYSTEM_NAME);
+ }
+
+ public boolean isMetricCaptureEnabled(){
+ if (cachedMetricCaptureEnableFlag != null){
+ return cachedMetricCaptureEnableFlag;
+ }
+
+ cachedMetricCaptureEnableFlag = get().hasPath(TAG_METRIC_CAPTURE_ENABLED)
+ ? get().getBoolean(TAG_METRIC_CAPTURE_ENABLED)
+ : false;
+
+ return cachedMetricCaptureEnableFlag;
+ }
+
+ public String getMailBoxName() {
+ return TAG_MAILBOX;
+ }
+
+ public Integer getMailBoxCapacity() {
+
+ if (cachedMailBoxCapacity != null) {
+ return cachedMailBoxCapacity;
+ }
+
+ final String PATH = new StringBuilder(TAG_MAILBOX).append(".").append(TAG_MAILBOX_CAPACITY).toString();
+ cachedMailBoxCapacity = get().hasPath(PATH)
+ ? get().getInt(PATH)
+ : DEFAULT_MAILBOX_CAPACITY;
+
+ return cachedMailBoxCapacity;
+ }
+
+ public FiniteDuration getMailBoxPushTimeout() {
+
+ if (cachedMailBoxPushTimeout != null) {
+ return cachedMailBoxPushTimeout;
+ }
+
+ final String PATH = new StringBuilder(TAG_MAILBOX).append(".").append(TAG_MAILBOX_PUSH_TIMEOUT).toString();
+
+ long timeout = get().hasPath(PATH)
+ ? get().getDuration(PATH, TimeUnit.NANOSECONDS)
+ : DEFAULT_MAILBOX_PUSH_TIMEOUT;
+
+ cachedMailBoxPushTimeout = new FiniteDuration(timeout, TimeUnit.NANOSECONDS);
+ return cachedMailBoxPushTimeout;
+ }
+
+ public static class Builder<T extends Builder> extends AbstractConfig.Builder<T>{
+
+ public Builder(String actorSystemName) {
+ super(actorSystemName);
+
+ //actor system config
+ configHolder.put(TAG_ACTOR_SYSTEM_NAME, actorSystemName);
+
+ //config for bounded mailbox
+ configHolder.put(TAG_MAILBOX, new HashMap<String, Object>());
+ }
+
+ public T metricCaptureEnabled(boolean enabled) {
+ configHolder.put(TAG_METRIC_CAPTURE_ENABLED, String.valueOf(enabled));
+ return (T)this;
+ }
+
+ public T mailboxCapacity(int capacity) {
+ Preconditions.checkArgument(capacity > 0, "mailbox capacity must be >0");
+
+ Map<String, Object> boundedMailbox = (Map) configHolder.get(TAG_MAILBOX);
+ boundedMailbox.put(TAG_MAILBOX_CAPACITY, capacity);
+ return (T)this;
+ }
+
+ public T mailboxPushTimeout(String timeout){
+ Duration pushTimeout = Duration.create(timeout);
+ Preconditions.checkArgument(pushTimeout.isFinite(), "invalid value for mailbox push timeout");
+
+ Map<String, Object> boundedMailbox = (Map) configHolder.get(TAG_MAILBOX);
+ boundedMailbox.put(TAG_MAILBOX_PUSH_TIMEOUT, timeout);
+ return (T)this;
+ }
+
+ public CommonConfig build() {
+ return new CommonConfig(merge());
+ }
+ }
+}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.remote.rpc.utils;
+package org.opendaylight.controller.cluster.common.actor;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.BoundedDequeBasedMailbox;
+import akka.dispatch.MailboxType;
+import akka.dispatch.ProducesMessageQueue;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
+
+ private final Logger LOG = LoggerFactory.getLogger(MeteredBoundedMailbox.class);
+
+ private MeteredMessageQueue queue;
+ private Integer capacity;
+ private FiniteDuration pushTimeOut;
+ private MetricRegistry registry;
+
+ private final String QUEUE_SIZE = "q-size";
+
+ public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
+
+ CommonConfig commonConfig = new CommonConfig(settings.config());
+ this.capacity = commonConfig.getMailBoxCapacity();
+ this.pushTimeOut = commonConfig.getMailBoxPushTimeout();
+
+ MetricsReporter reporter = MetricsReporter.getInstance();
+ registry = reporter.getMetricsRegistry();
+ }
+
+
+ @Override
+ public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
+ this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
+ monitorQueueSize(owner, this.queue);
+ return this.queue;
+ }
+
+ private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
+ if (owner.isEmpty()) {
+ return; //there's no actor to monitor
+ }
+ String actorName = owner.get().path().toStringWithoutAddress();
+ String metricName = registry.name(actorName, QUEUE_SIZE);
+
+ if (registry.getMetrics().containsKey(metricName))
+ return; //already registered
+
+ Gauge queueSize = getQueueSizeGuage(monitoredQueue);
+ registerQueueSizeMetric(metricName, queueSize);
+ }
+
+
+ public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
+
+ public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
+ super(capacity, pushTimeOut);
+ }
+ }
+
+ private Gauge getQueueSizeGuage(final MeteredMessageQueue monitoredQueue ){
+ return new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return monitoredQueue.size();
+ }
+ };
+ }
+
+ private void registerQueueSizeMetric(String metricName, Gauge metric){
+ try {
+ registry.register(metricName,metric);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Unable to register queue size in metrics registry. Failed with exception {}. ", e);
+ }
+ }
+}
+
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.actor.UntypedActor;
+import akka.japi.Procedure;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+
+/**
+ * Represents behaviour that can be exhibited by actors of type {@link akka.actor.UntypedActor}
+ * <p/>
+ * This behaviour meters actor's default behaviour. It captures 2 metrics:
+ * <ul>
+ * <li>message processing rate of actor's receive block</li>
+ * <li>message processing rate by message type</li>
+ * </ul>
+ *
+ * The information is reported to {@link org.opendaylight.controller.cluster.reporting.MetricsReporter}
+ */
+public class MeteringBehavior implements Procedure<Object> {
+
+ private final UntypedActor meteredActor;
+
+ private final MetricRegistry METRICREGISTRY = MetricsReporter.getInstance().getMetricsRegistry();
+ private final String MSG_PROCESSING_RATE = "msg-rate";
+
+ private String actorName;
+ private Timer msgProcessingTimer;
+
+ /**
+ *
+ * @param actor whose behaviour needs to be metered
+ */
+ public MeteringBehavior(UntypedActor actor){
+ Preconditions.checkArgument(actor != null, "actor must not be null");
+
+ this.meteredActor = actor;
+ actorName = meteredActor.getSelf().path().toStringWithoutAddress();
+ final String msgProcessingTime = MetricRegistry.name(actorName, MSG_PROCESSING_RATE);
+ msgProcessingTimer = METRICREGISTRY.timer(msgProcessingTime);
+ }
+
+ /**
+ * Uses 2 timers to measure message processing rate. One for overall message processing rate and
+ * another to measure rate by message type. The timers are re-used if they were previously created.
+ * <p/>
+ * {@link com.codahale.metrics.MetricRegistry} maintains a reservoir for different timers where
+ * collected timings are kept. It exposes various metrics for each timer based on collected
+ * data. Eg: count of messages, 99, 95, 50... percentiles, max, mean etc.
+ * <p/>
+ * These metrics are exposed as JMX bean.
+ *
+ * @see <a href="http://dropwizard.github.io/metrics/manual/core/#timers">
+ * http://dropwizard.github.io/metrics/manual/core/#timers</a>
+ *
+ * @param message
+ * @throws Exception
+ */
+ @Override
+ public void apply(Object message) throws Exception {
+ final String messageType = message.getClass().getSimpleName();
+
+ final String msgProcessingTimeByMsgType =
+ MetricRegistry.name(actorName, MSG_PROCESSING_RATE, messageType);
+
+ final Timer msgProcessingTimerByMsgType = METRICREGISTRY.timer(msgProcessingTimeByMsgType);
+
+ //start timers
+ final Timer.Context context = msgProcessingTimer.time();
+ final Timer.Context contextByMsgType = msgProcessingTimerByMsgType.time();
+
+ meteredActor.onReceive(message);
+
+ //stop timers
+ contextByMsgType.stop();
+ context.stop();
+ }
+}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.remote.rpc.messages;
+package org.opendaylight.controller.cluster.common.actor;
import akka.actor.ActorRef;
private final ActorRef actorRef;
public Monitor(ActorRef actorRef){
-
this.actorRef = actorRef;
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import com.typesafe.config.Config;
+
+/**
+ * Represents a unified view of configuration.
+ * <p/>
+ * It merges configuration from:
+ * <ul>
+ * <li>Config subsystem</li>
+ * <li>Akka configuration files</li>
+ * </ul>
+ *
+ * Configurations defined in config subsystem takes precedence.
+ */
+public interface UnifiedConfig {
+
+ /**
+ * Returns an immutable instance of unified configuration
+ * @return
+ */
+ public Config get();
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.common.reporting;
+package org.opendaylight.controller.cluster.reporting;
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
public class MetricsReporter implements AutoCloseable{
private final MetricRegistry METRICS_REGISTRY = new MetricRegistry();
- private final String DOMAIN = "org.opendaylight.controller";
+ private final String DOMAIN = "org.opendaylight.controller.actor.metric";
public final JmxReporter jmxReporter = JmxReporter.forRegistry(METRICS_REGISTRY).inDomain(DOMAIN).build();
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.common.actor;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.BoundedDequeBasedMailbox;
-import akka.dispatch.MailboxType;
-import akka.dispatch.ProducesMessageQueue;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.MetricRegistry;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.opendaylight.controller.common.reporting.MetricsReporter;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.TimeUnit;
-
-public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
-
- private MeteredMessageQueue queue;
- private Integer capacity;
- private FiniteDuration pushTimeOut;
- private ActorPath actorPath;
- private MetricsReporter reporter;
-
- private final String QUEUE_SIZE = "queue-size";
- private final String CAPACITY = "mailbox-capacity";
- private final String TIMEOUT = "mailbox-push-timeout-time";
- private final Long DEFAULT_TIMEOUT = 10L;
-
- public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
- Preconditions.checkArgument( config.hasPath("mailbox-capacity"), "Missing configuration [mailbox-capacity]" );
- this.capacity = config.getInt(CAPACITY);
- Preconditions.checkArgument( this.capacity > 0, "mailbox-capacity must be > 0");
-
- Long timeout = -1L;
- if ( config.hasPath(TIMEOUT) ){
- timeout = config.getDuration(TIMEOUT, TimeUnit.NANOSECONDS);
- } else {
- timeout = DEFAULT_TIMEOUT;
- }
- Preconditions.checkArgument( timeout > 0, "mailbox-push-timeout-time must be > 0");
- this.pushTimeOut = new FiniteDuration(timeout, TimeUnit.NANOSECONDS);
-
- reporter = MetricsReporter.getInstance();
- }
-
-
- @Override
- public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
- this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
- monitorQueueSize(owner, this.queue);
- return this.queue;
- }
-
- private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
- if (owner.isEmpty()) {
- return; //there's no actor to monitor
- }
- actorPath = owner.get().path();
- String actorInstanceId = Integer.toString(owner.get().hashCode());
-
- MetricRegistry registry = reporter.getMetricsRegistry();
- String actorName = registry.name(actorPath.toString(), actorInstanceId, QUEUE_SIZE);
-
- if (registry.getMetrics().containsKey(actorName))
- return; //already registered
-
- registry.register(actorName,
- new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return monitoredQueue.size();
- }
- });
- }
-
-
- public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
-
- public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
- super(capacity, pushTimeOut);
- }
- }
-
-}
-
--- /dev/null
+package org.opendaylight.controller.cluster.common.actor;
+
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CommonConfigTest {
+
+ @Test
+ public void testCommonConfigDefaults(){
+ CommonConfig config = new CommonConfig.Builder<>("testsystem").build();
+
+ assertNotNull(config.getActorSystemName());
+ assertNotNull(config.getMailBoxCapacity());
+ assertNotNull(config.getMailBoxName());
+ assertNotNull(config.getMailBoxPushTimeout());
+ assertNotNull(config.isMetricCaptureEnabled());
+ }
+
+ @Test
+ public void testCommonConfigOverride(){
+
+ int expectedCapacity = 123;
+ String timeoutValue = "1000ms";
+ CommonConfig config = new CommonConfig.Builder<>("testsystem")
+ .mailboxCapacity(expectedCapacity)
+ .mailboxPushTimeout(timeoutValue)
+ .metricCaptureEnabled(true)
+ .build();
+
+ assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+
+ FiniteDuration expectedTimeout = FiniteDuration.create(1000, TimeUnit.MILLISECONDS);
+ assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+
+ assertTrue(config.isMetricCaptureEnabled());
+ }
+}
\ No newline at end of file
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.common.actor;
+package org.opendaylight.controller.cluster.common.actor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
public class MeteredBoundedMailboxTest {
private static ActorSystem actorSystem;
+ private static CommonConfig config;
private final ReentrantLock lock = new ReentrantLock();
@Before
public void setUp() throws Exception {
- actorSystem = ActorSystem.create("testsystem");
+ config = new CommonConfig.Builder<>("testsystem").build();
+ actorSystem = ActorSystem.create("testsystem", config.get());
}
@After
}
@Test
- public void test_WhenQueueIsFull_ShouldSendMsgToDeadLetter() throws InterruptedException {
+ public void shouldSendMsgToDeadLetterWhenQueueIsFull() throws InterruptedException {
final JavaTestKit mockReceiver = new JavaTestKit(actorSystem);
actorSystem.eventStream().subscribe(mockReceiver.getRef(), DeadLetter.class);
final FiniteDuration TWENTY_SEC = new FiniteDuration(20, TimeUnit.SECONDS);
- String boundedMailBox = actorSystem.name() + ".bounded-mailbox";
- ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(boundedMailBox),
+ ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(config.getMailBoxName()),
"pingpongactor");
actorSystem.mailboxes().settings();
testsystem {
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 10
mailbox-push-timeout-time = 100ms
}
-testsystem {
-
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10ms
}
-}
\ No newline at end of file
<type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
<name>dom-broker</name>
</dom-broker>
+ <enable-metric-capture xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">true</enable-metric-capture>
+ <actor-system-name xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">odl-cluster-rpc</actor-system-name>
+ <bounded-mailbox-capacity xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">1000</bounded-mailbox-capacity>
</module>
</modules>
odl-cluster-data {
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 100ms
- }
+ }
+
+ metric-capture-enabled = true
+
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
odl-cluster-rpc {
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 100ms
}
+
+ metric-capture-enabled = true
+
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
cluster {
- seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@127.0.0.1:2551"]
+ seed-nodes = ["akka.tcp://odl-cluster-rpc@127.0.0.1:2551"]
auto-down-unreachable-after = 10s
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.osgi.BundleDelegatingClassLoader;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.osgi.framework.BundleContext;
-
-import java.io.File;
-
-public class ActorSystemFactory {
-
- public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
- public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
- public static final String CONFIGURATION_NAME = "odl-cluster-data";
-
- private static volatile ActorSystem actorSystem = null;
-
- public static final ActorSystem getInstance(){
- return actorSystem;
- }
-
- /**
- * This method should be called only once during initialization
- *
- * @param bundleContext
- */
- public static final ActorSystem createInstance(final BundleContext bundleContext) {
- if(actorSystem == null) {
- // Create an OSGi bundle classloader for actor system
- BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
- Thread.currentThread().getContextClassLoader());
- synchronized (ActorSystemFactory.class) {
- // Double check
-
- if (actorSystem == null) {
- ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
- ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader);
- system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
- actorSystem = system;
- }
- }
- }
-
- return actorSystem;
- }
-
-
- private static final Config readAkkaConfiguration(){
- File defaultConfigFile = new File(AKKA_CONF_PATH);
- Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
- return ConfigFactory.parseFile(defaultConfigFile);
- }
-}
import akka.japi.Creator;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.japi.Creator;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSystem;
-
+import akka.actor.Props;
+import akka.osgi.BundleDelegatingClassLoader;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.osgi.framework.BundleContext;
+import java.io.File;
+import java.util.concurrent.atomic.AtomicReference;
+
public class DistributedDataStoreFactory {
+
+ public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
+ public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
+ public static final String CONFIGURATION_NAME = "odl-cluster-data";
+ private static AtomicReference<ActorSystem> actorSystem = new AtomicReference<>();
+
public static DistributedDataStore createInstance(String name, SchemaService schemaService,
- DatastoreContext datastoreContext, BundleContext bundleContext) {
+ DatastoreContext datastoreContext, BundleContext bundleContext) {
- ActorSystem actorSystem = ActorSystemFactory.createInstance(bundleContext);
+ ActorSystem actorSystem = getOrCreateInstance(bundleContext);
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
final DistributedDataStore dataStore =
- new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
- config, datastoreContext );
+ new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
+ config, datastoreContext);
+
ShardStrategyFactory.setConfiguration(config);
schemaService.registerSchemaContextListener(dataStore);
return dataStore;
}
+
+ synchronized private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext) {
+
+ if (actorSystem.get() != null){
+ return actorSystem.get();
+ }
+ // Create an OSGi bundle classloader for actor system
+ BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
+ Thread.currentThread().getContextClassLoader());
+
+ ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
+ ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader);
+ system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+
+ actorSystem.set(system);
+ return system;
+ }
+
+
+ private static final Config readAkkaConfiguration() {
+ File defaultConfigFile = new File(AKKA_CONF_PATH);
+ Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
+ return ConfigFactory.parseFile(defaultConfigFile);
+ }
}
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
+import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
+ if (isMetricsCaptureEnabled()) {
+ getContext().become(new MeteringBehavior(this));
+ }
}
private static Map<String, String> mapPeerAddresses(
ActorRef transactionChain = getContext().actorOf(
ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
- getSelf());
+ getSelf());
+ }
+
+ private boolean isMetricsCaptureEnabled(){
+ CommonConfig config = new CommonConfig(getContext().system().settings().config());
+ return config.isMetricCaptureEnabled();
}
@Override protected void applyState(ActorRef clientActor, String identifier,
import akka.japi.Creator;
import akka.japi.Function;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
* <li> Monitor the cluster members and store their addresses
* <ul>
*/
-public class ShardManager extends AbstractUntypedActor {
+public class ShardManager extends AbstractUntypedActorWithMetering {
// Stores a mapping between a member name and the address of the member
// Member names look like "member-1", "member-2" etc and are as specified
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.Creator;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
odl-cluster-data {
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 100ms
}
+
+ metric-capture-enabled = true
+
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
cluster {
range "1..max";
}
}
-
+
typedef operation-timeout-type {
type uint16 {
range "5..max";
}
}
-
+
grouping data-store-properties {
leaf max-shard-data-change-executor-queue-size {
default 1000;
type non-zero-uint16-type;
description "The maximum queue size for each shard's data store executor.";
}
-
+
leaf shard-transaction-idle-timeout-in-minutes {
default 10;
type non-zero-uint16-type;
description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
}
-
+
leaf operation-timeout-in-seconds {
default 5;
type operation-timeout-type;
description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
}
+
+ leaf enable-metric-capture {
+ default false;
+ type boolean;
+ description "Enable or disable metric capture.";
+ }
+
+ leaf bounded-mailbox-capacity {
+ default 1000;
+ type non-zero-uint16-type;
+ description "Max queue size that an actor's mailbox can reach";
+ }
}
-
+
// Augments the 'configuration' choice node under modules/module.
augment "/config:modules/config:module/config:configuration" {
case distributed-config-datastore-provider {
}
}
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 100ms
}
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_${scala.version}</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
+ </dependency>
<!-- SAL Dependencies -->
<dependency>
package org.opendaylight.controller.config.yang.config.remote_rpc_connector;
+import org.opendaylight.controller.cluster.common.actor.DefaultAkkaConfigurationReader;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderFactory;
import org.opendaylight.controller.sal.core.api.Broker;
import org.osgi.framework.BundleContext;
@Override
public java.lang.AutoCloseable createInstance() {
Broker broker = getDomBrokerDependency();
- return RemoteRpcProviderFactory.createInstance(broker, bundleContext);
+
+ RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder(getActorSystemName())
+ .metricCaptureEnabled(getEnableMetricCapture())
+ .mailboxCapacity(getBoundedMailboxCapacity())
+ .withConfigReader(new DefaultAkkaConfigurationReader())
+ .build();
+
+ return RemoteRpcProviderFactory.createInstance(broker, bundleContext, config);
}
public void setBundleContext(BundleContext bundleContext) {
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import org.opendaylight.controller.remote.rpc.messages.Monitor;
-
-public abstract class AbstractUntypedActor extends UntypedActor {
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
-
-
- public AbstractUntypedActor(){
- LOG.debug("Actor created {}", getSelf());
- getContext().
- system().
- actorSelection("user/termination-monitor").
- tell(new Monitor(getSelf()), getSelf());
- }
-
- @Override public void onReceive(Object message) throws Exception {
- LOG.debug("Received message {}", message);
- handleReceive(message);
- LOG.debug("Done handling message {}", message);
- }
-
- protected abstract void handleReceive(Object message) throws Exception;
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-
-public class ActorConstants {
- public static final String RPC_BROKER = "rpc-broker";
- public static final String RPC_REGISTRY = "rpc-registry";
- public static final String RPC_MANAGER = "rpc";
-
- public static final String RPC_BROKER_PATH= "/user/rpc/rpc-broker";
- public static final String RPC_REGISTRY_PATH = "/user/rpc/rpc-registry";
- public static final String RPC_MANAGER_PATH = "/user/rpc";
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorSystem;
-import akka.osgi.BundleDelegatingClassLoader;
-import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader;
-import org.osgi.framework.BundleContext;
-
-
-public class ActorSystemFactory {
-
- public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-rpc";
- public static final String CONFIGURATION_NAME = "odl-cluster-rpc";
-
- private static volatile ActorSystem actorSystem = null;
-
- public static final ActorSystem getInstance(){
- return actorSystem;
- }
-
- /**
- * This method should be called only once during initialization
- *
- * @param bundleContext
- */
- public static final void createInstance(final BundleContext bundleContext, AkkaConfigurationReader akkaConfigurationReader) {
- if(actorSystem == null) {
- // Create an OSGi bundle classloader for actor system
- BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
- Thread.currentThread().getContextClassLoader());
- synchronized (ActorSystemFactory.class) {
- // Double check
- if (actorSystem == null) {
- ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
- akkaConfigurationReader.read().getConfig(CONFIGURATION_NAME), classLoader);
- actorSystem = system;
- }
- }
- } else {
- throw new IllegalStateException("Actor system should be created only once. Use getInstance method to access existing actor system");
- }
- }
-
-}
package org.opendaylight.controller.remote.rpc;
-import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
import akka.dispatch.OnComplete;
-import akka.util.Timeout;
-
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
-import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.concurrent.ExecutionContext;
import java.util.Collections;
import java.util.Set;
+import static akka.pattern.Patterns.ask;
+
public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefaultImplementation {
private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
private final ActorRef rpcBroker;
private final SchemaContext schemaContext;
+ private final RemoteRpcProviderConfig config;
- public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) {
+ public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext, RemoteRpcProviderConfig config) {
this.rpcBroker = rpcBroker;
this.schemaContext = schemaContext;
+ this.config = config;
}
@Override
final SettableFuture<RpcResult<CompositeNode>> listenableFuture = SettableFuture.create();
- scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg,
- new Timeout(ActorUtil.ASK_DURATION));
+ scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg, config.getAskDuration());
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
- private final ActorSystem actorSystem;
private final RpcProvisionRegistry rpcProvisionRegistry;
+
+ private ActorSystem actorSystem;
private Broker.ProviderSession brokerSession;
private SchemaContext schemaContext;
private ActorRef rpcManager;
+ private RemoteRpcProviderConfig config;
public RemoteRpcProvider(ActorSystem actorSystem, RpcProvisionRegistry rpcProvisionRegistry) {
this.actorSystem = actorSystem;
this.rpcProvisionRegistry = rpcProvisionRegistry;
+ this.config = new RemoteRpcProviderConfig(actorSystem.settings().config());
}
@Override
public void close() throws Exception {
- this.actorSystem.shutdown();
+ if (this.actorSystem != null)
+ this.actorSystem.shutdown();
}
@Override
}
private void start() {
- LOG.info("Starting all rpc listeners and actors.");
- // Create actor to handle and sync routing table in cluster
+ LOG.info("Starting remote rpc service...");
+
SchemaService schemaService = brokerSession.getService(SchemaService.class);
schemaContext = schemaService.getGlobalContext();
- rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry), ActorConstants.RPC_MANAGER);
+ rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry),
+ config.getRpcManagerName());
- LOG.debug("Rpc actors are created.");
+ LOG.debug("rpc manager started");
}
-
@Override
public void onGlobalContextUpdated(SchemaContext schemaContext) {
this.schemaContext = schemaContext;
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.util.Timeout;
+import com.typesafe.config.Config;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class RemoteRpcProviderConfig extends CommonConfig {
+
+ protected static final String TAG_RPC_BROKER_NAME = "rpc-broker-name";
+ protected static final String TAG_RPC_REGISTRY_NAME = "registry-name";
+ protected static final String TAG_RPC_MGR_NAME = "rpc-manager-name";
+ protected static final String TAG_RPC_BROKER_PATH = "rpc-broker-path";
+ protected static final String TAG_RPC_REGISTRY_PATH = "rpc-registry-path";
+ protected static final String TAG_RPC_MGR_PATH = "rpc-manager-path";
+ protected static final String TAG_ASK_DURATION = "ask-duration";
+ private static final String TAG_GOSSIP_TICK_INTERVAL = "gossip-tick-interval";
+
+ //locally cached values
+ private Timeout cachedAskDuration;
+ private FiniteDuration cachedGossipTickInterval;
+
+ public RemoteRpcProviderConfig(Config config){
+ super(config);
+ }
+
+ public String getRpcBrokerName(){
+ return get().getString(TAG_RPC_BROKER_NAME);
+ }
+
+ public String getRpcRegistryName(){
+ return get().getString(TAG_RPC_REGISTRY_NAME);
+ }
+
+ public String getRpcManagerName(){
+ return get().getString(TAG_RPC_MGR_NAME);
+ }
+
+ public String getRpcBrokerPath(){
+ return get().getString(TAG_RPC_BROKER_PATH);
+ }
+
+ public String getRpcRegistryPath(){
+ return get().getString(TAG_RPC_REGISTRY_PATH);
+
+ }
+
+ public String getRpcManagerPath(){
+ return get().getString(TAG_RPC_MGR_PATH);
+ }
+
+
+ public Timeout getAskDuration(){
+ if (cachedAskDuration != null){
+ return cachedAskDuration;
+ }
+
+ cachedAskDuration = new Timeout(new FiniteDuration(
+ get().getDuration(TAG_ASK_DURATION, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS));
+
+ return cachedAskDuration;
+ }
+
+ public FiniteDuration getGossipTickInterval(){
+ if (cachedGossipTickInterval != null) {
+ return cachedGossipTickInterval;
+ }
+
+ cachedGossipTickInterval = new FiniteDuration(
+ get().getDuration(TAG_GOSSIP_TICK_INTERVAL, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+
+ return cachedGossipTickInterval;
+ }
+
+ public static class Builder extends CommonConfig.Builder<Builder>{
+
+ public Builder(String actorSystemName){
+ super(actorSystemName);
+
+ //Actor names
+ configHolder.put(TAG_RPC_BROKER_NAME, "broker");
+ configHolder.put(TAG_RPC_REGISTRY_NAME, "registry");
+ configHolder.put(TAG_RPC_MGR_NAME, "rpc");
+
+ //Actor paths
+ configHolder.put(TAG_RPC_BROKER_PATH, "/user/rpc/broker");
+ configHolder.put(TAG_RPC_REGISTRY_PATH, "/user/rpc/registry");
+ configHolder.put(TAG_RPC_MGR_PATH, "/user/rpc");
+
+ //durations
+ configHolder.put(TAG_ASK_DURATION, "15s");
+ configHolder.put(TAG_GOSSIP_TICK_INTERVAL, "500ms");
+
+ }
+
+ public RemoteRpcProviderConfig build(){
+ return new RemoteRpcProviderConfig(merge());
+ }
+ }
+
+
+}
package org.opendaylight.controller.remote.rpc;
-
-import org.opendaylight.controller.remote.rpc.utils.DefaultAkkaConfigurationReader;
+import akka.actor.ActorSystem;
+import akka.osgi.BundleDelegatingClassLoader;
+import com.typesafe.config.Config;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RemoteRpcProviderFactory {
- public static RemoteRpcProvider createInstance(final Broker broker, final BundleContext bundleContext){
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProviderFactory.class);
+
+ public static RemoteRpcProvider createInstance(
+ final Broker broker, final BundleContext bundleContext, final RemoteRpcProviderConfig config){
- ActorSystemFactory.createInstance(bundleContext, new DefaultAkkaConfigurationReader());
RemoteRpcProvider rpcProvider =
- new RemoteRpcProvider(ActorSystemFactory.getInstance(), (RpcProvisionRegistry) broker);
+ new RemoteRpcProvider(createActorSystem(bundleContext, config), (RpcProvisionRegistry) broker);
+
broker.registerProvider(rpcProvider);
return rpcProvider;
}
+
+ private static ActorSystem createActorSystem(BundleContext bundleContext, RemoteRpcProviderConfig config){
+
+ // Create an OSGi bundle classloader for actor system
+ BundleDelegatingClassLoader classLoader =
+ new BundleDelegatingClassLoader(bundleContext.getBundle(),
+ Thread.currentThread().getContextClassLoader());
+
+ Config actorSystemConfig = config.get();
+ LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render());
+
+ if (config.isMetricCaptureEnabled()) {
+ LOG.info("Instrumentation is enabled in actor system {}. Metrics can be viewed in JMX console.",
+ config.getActorSystemName());
+ }
+
+ return ActorSystem.create(config.getActorSystemName(), actorSystemConfig, classLoader);
+ }
}
package org.opendaylight.controller.remote.rpc;
-import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.dispatch.OnComplete;
import akka.japi.Creator;
import akka.japi.Pair;
-import akka.util.Timeout;
-
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
import org.opendaylight.controller.remote.rpc.utils.RoutingLogic;
-import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
+import static akka.pattern.Patterns.ask;
+
/**
* Actor to initiate execution of remote RPC on other nodes of the cluster.
*/
private final Broker.ProviderSession brokerSession;
private final ActorRef rpcRegistry;
private final SchemaContext schemaContext;
+ private final RemoteRpcProviderConfig config;
private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
SchemaContext schemaContext) {
this.brokerSession = brokerSession;
this.rpcRegistry = rpcRegistry;
this.schemaContext = schemaContext;
+ config = new RemoteRpcProviderConfig(getContext().system().settings().config());
}
public static Props props(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
null, msg.getRpc(), msg.getIdentifier());
RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
- scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg,
- new Timeout(ActorUtil.LOCAL_ASK_DURATION));
+ scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg, config.getAskDuration());
final ActorRef sender = getSender();
final ActorRef self = self();
ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(),
schemaContext), msg.getRpc());
- scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg,
- new Timeout(ActorUtil.REMOTE_ASK_DURATION));
+ scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg, config.getAskDuration());
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
import akka.actor.SupervisorStrategy;
import akka.japi.Creator;
import akka.japi.Function;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.yangtools.yang.common.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
+
import java.util.Set;
/**
private ActorRef rpcBroker;
private ActorRef rpcRegistry;
private final Broker.ProviderSession brokerSession;
+ private final RemoteRpcProviderConfig config;
private RpcListener rpcListener;
private RoutedRpcListener routeChangeListener;
private RemoteRpcImplementation rpcImplementation;
private final RpcProvisionRegistry rpcProvisionRegistry;
private RpcManager(SchemaContext schemaContext,
- Broker.ProviderSession brokerSession, RpcProvisionRegistry rpcProvisionRegistry) {
+ Broker.ProviderSession brokerSession,
+ RpcProvisionRegistry rpcProvisionRegistry) {
this.schemaContext = schemaContext;
this.brokerSession = brokerSession;
this.rpcProvisionRegistry = rpcProvisionRegistry;
+ this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
createRpcActors();
startListeners();
public static Props props(final SchemaContext schemaContext,
- final Broker.ProviderSession brokerSession, final RpcProvisionRegistry rpcProvisionRegistry) {
+ final Broker.ProviderSession brokerSession,
+ final RpcProvisionRegistry rpcProvisionRegistry) {
return Props.create(new Creator<RpcManager>() {
@Override
public RpcManager create() throws Exception {
private void createRpcActors() {
LOG.debug("Create rpc registry and broker actors");
- Config conf = ConfigFactory.load();
-
rpcRegistry =
getContext().actorOf(Props.create(RpcRegistry.class).
- withMailbox(ActorUtil.MAILBOX), ActorConstants.RPC_REGISTRY);
+ withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
rpcBroker =
getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
- withMailbox(ActorUtil.MAILBOX),ActorConstants.RPC_BROKER);
+ withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
rpcRegistry.tell(localRouter, self());
rpcListener = new RpcListener(rpcRegistry);
routeChangeListener = new RoutedRpcListener(rpcRegistry);
- rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext);
+ rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config);
brokerSession.addRpcRegistrationListener(rpcListener);
rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
-import org.opendaylight.controller.remote.rpc.messages.Monitor;
+import org.opendaylight.controller.cluster.common.actor.Monitor;
public class TerminationMonitor extends UntypedActor{
protected final LoggingAdapter LOG =
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
-import akka.actor.UntypedActor;
import akka.dispatch.Mapper;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Pair;
import akka.pattern.Patterns;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import scala.concurrent.Future;
import java.util.Map;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
*/
-public class RpcRegistry extends UntypedActor {
+public class RpcRegistry extends AbstractUntypedActorWithMetering {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
*/
private ActorRef localRouter;
+ private RemoteRpcProviderConfig config;
+
public RpcRegistry() {
bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
-
+ this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
log.info("Bucket store path = {}", bucketStore.path().toString());
}
this.bucketStore = bucketStore;
}
- @Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: message [{}]", message);
+ @Override
+ protected void handleReceive(Object message) throws Exception {
//TODO: if sender is remote, reject message
if (message instanceof SetLocalRouter)
Preconditions.checkState(localRouter != null, "Router must be set first");
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
}
*/
private void receiveRemoveRoutes(RemoveRoutes msg) {
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
}
private void receiveGetRouter(FindRouters msg) {
final ActorRef sender = getSender();
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis());
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration());
futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
}
* @param routeId
* @return
*/
- private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+ private Messages.FindRoutersReply createReplyWithRouters(
+ Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
List<Pair<ActorRef, Long>> routers = new ArrayList<>();
Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
* @param sender client who asked to find the routers.
* @return
*/
- private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
+ private Mapper<Object, Void> getMapperToGetRouter(
+ final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
return new Mapper<Object, Void>() {
@Override
public Void apply(Object replyMessage) {
public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
- private Long version = System.currentTimeMillis();;
+ private Long version = System.currentTimeMillis();
private T data;
import akka.actor.ActorRefProvider;
import akka.actor.Address;
import akka.actor.Props;
-import akka.actor.UntypedActor;
import akka.cluster.ClusterActorRefProvider;
import akka.event.Logging;
import akka.event.LoggingAdapter;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.utils.ConditionalProbe;
import java.util.HashMap;
* This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
*
*/
-public class BucketStore extends UntypedActor {
+public class BucketStore extends AbstractUntypedActorWithMetering {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
/**
* Bucket owned by the node
*/
- private BucketImpl localBucket = new BucketImpl();;
+ private BucketImpl localBucket = new BucketImpl();
/**
* Buckets ownded by other known nodes in the cluster
private ConditionalProbe probe;
+ private final RemoteRpcProviderConfig config;
+
+ public BucketStore(){
+ config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+ }
+
@Override
public void preStart(){
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
if ( provider instanceof ClusterActorRefProvider)
- getContext().actorOf(Props.create(Gossiper.class).withMailbox(ActorUtil.MAILBOX), "gossiper");
+ getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
}
- @Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: node[{}], message[{}]", selfAddress, message);
+ @Override
+ protected void handleReceive(Object message) throws Exception {
if (probe != null) {
probe.tell(message, getSelf());
}
receiveGetLocalBucket();
} else if (message instanceof GetBucketsByMembers) {
receiveGetBucketsByMembers(
- ((GetBucketsByMembers) message).getMembers());
+ ((GetBucketsByMembers) message).getMembers());
} else if (message instanceof GetBucketVersions) {
receiveGetBucketVersions();
} else if (message instanceof UpdateRemoteBuckets) {
receiveUpdateRemoteBuckets(
- ((UpdateRemoteBuckets) message).getBuckets());
+ ((UpdateRemoteBuckets) message).getBuckets());
} else {
log.debug("Unhandled message [{}]", message);
unhandled(message);
}
-
}
/**
import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Cancellable;
-import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
*
*/
-public class Gossiper extends UntypedActor {
+public class Gossiper extends AbstractUntypedActorWithMetering {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private Boolean autoStartGossipTicks = true;
- public Gossiper(){}
+ private RemoteRpcProviderConfig config;
+
+ public Gossiper(){
+ config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+ }
/**
* Helpful for testing
if (autoStartGossipTicks) {
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
- ActorUtil.GOSSIP_TICK_INTERVAL, //interval
+ config.getGossipTickInterval(), //interval
getSelf(), //target
new Messages.GossiperMessages.GossipTick(), //message
getContext().dispatcher(), //execution context
}
@Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: node[{}], message[{}]", selfAddress, message);
-
+ protected void handleReceive(Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
if (message instanceof GossipTick)
receiveGossipTick();
- //Message from remote gossiper with its bucket versions
+ //Message from remote gossiper with its bucket versions
else if (message instanceof GossipStatus)
receiveGossipStatus((GossipStatus) message);
- //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
- //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
- //message with its local versions
+ //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
+ //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
+ //message with its local versions
else if (message instanceof GossipEnvelope)
receiveGossip((GossipEnvelope) message);
void receiveGossipTick(){
if (clusterMembers.size() == 0) return; //no members to send gossip status to
- Address remoteMemberToGossipTo = null;
+ Address remoteMemberToGossipTo;
if (clusterMembers.size() == 1)
remoteMemberToGossipTo = clusterMembers.get(0);
final ActorRef sender = getSender();
Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+ Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), ActorUtil.ASK_DURATION.toMillis());
+ Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
}
//Get local status from bucket store and send to remote
Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+ Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
//Find gossiper on remote system
ActorSelection remoteRef = getContext().system().actorSelection(
localIsOlder.add(address);
else if (localVersions.get(address) > remoteVersions.get(address))
localIsNewer.add(address);
- else
- continue;
}
if (!localIsOlder.isEmpty())
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- */
-package org.opendaylight.controller.remote.rpc.utils;
-
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.TimeUnit;
-
-public class ActorUtil {
- public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS);
- public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS);
- public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS);
- public static final FiniteDuration GOSSIP_TICK_INTERVAL = Duration.create(500, TimeUnit.MILLISECONDS);
- public static final String MAILBOX = "bounded-mailbox";
-}
odl-cluster-rpc {
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 100ms
}
import config { prefix config; revision-date 2013-04-05; }
import opendaylight-md-sal-dom {prefix dom;}
-
+
description
"This module contains the base YANG definitions for
the remote routed rpc";
-
+
revision "2014-07-07" {
description
"Initial revision";
augment "/config:modules/config:module/config:configuration" {
case remote-rpc-connector {
when "/config:modules/config:module/config:type = 'remote-rpc-connector'";
-
+
container dom-broker {
uses config:service-ref {
refine type {
}
}
}
+
+ leaf enable-metric-capture {
+ default false;
+ type boolean;
+ description "Enable or disable metric capture.";
+ }
+
+ leaf actor-system-name {
+ default odl-cluster-rpc;
+ type string;
+ description "Name by which actor system is identified. Its also used to find relevant configuration";
+ }
+
+ leaf bounded-mailbox-capacity {
+ default 1000;
+ type uint16;
+ description "Max queue size that an actor's mailbox can reach";
+ }
}
}
package org.opendaylight.controller.remote.rpc;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.ImmutableList;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.Node;
import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
-import com.google.common.collect.ImmutableList;
-import com.typesafe.config.ConfigFactory;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
/**
* Base class for RPC tests.
@BeforeClass
public static void setup() throws InterruptedException {
- node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
- node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
+ RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+ RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+ node1 = ActorSystem.create("opendaylight-rpc", config1.get());
+ node2 = ActorSystem.create("opendaylight-rpc", config2.get());
}
@AfterClass
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-
-import akka.actor.ActorSystem;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-import org.junit.After;
-import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class ActorSystemFactoryTest {
- ActorSystem system = null;
-
- @Test
- public void testActorSystemCreation(){
- BundleContext context = mock(BundleContext.class);
- when(context.getBundle()).thenReturn(mock(Bundle.class));
-
- AkkaConfigurationReader reader = mock(AkkaConfigurationReader.class);
- when(reader.read()).thenReturn(ConfigFactory.load());
-
- ActorSystemFactory.createInstance(context, reader);
- system = ActorSystemFactory.getInstance();
- Assert.assertNotNull(system);
- // Check illegal state exception
-
- try {
- ActorSystemFactory.createInstance(context, reader);
- fail("Illegal State exception should be thrown, while creating actor system second time");
- } catch (IllegalStateException e) {
- }
- }
-
- @After
- public void cleanup() throws InterruptedException {
- if(system != null) {
- system.shutdown();
- }
- }
-}
package org.opendaylight.controller.remote.rpc;
-import static org.junit.Assert.assertEquals;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Test;
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
import org.opendaylight.controller.xml.codec.XmlUtils;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import akka.testkit.JavaTestKit;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.util.concurrent.ListenableFuture;
+import static org.junit.Assert.assertEquals;
/***
* Unit tests for RemoteRpcImplementation.
final AtomicReference<AssertionError> assertError = new AtomicReference<>();
try {
RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
- probeReg1.getRef(), schemaContext);
+ probeReg1.getRef(), schemaContext, getConfig());
final CompositeNode input = makeRPCInput("foo");
final CompositeNode output = makeRPCOutput("bar");
final AtomicReference<AssertionError> assertError = new AtomicReference<>();
try {
RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
- probeReg1.getRef(), schemaContext);
+ probeReg1.getRef(), schemaContext, getConfig());
QName instanceQName = new QName(new URI("ns"), "instance");
YangInstanceIdentifier identifier = YangInstanceIdentifier.of(instanceQName);
final AtomicReference<AssertionError> assertError = new AtomicReference<>();
try {
RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
- probeReg1.getRef(), schemaContext);
+ probeReg1.getRef(), schemaContext, getConfig());
final CompositeNode input = makeRPCInput("foo");
final AtomicReference<AssertionError> assertError = new AtomicReference<>();
try {
RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
- probeReg1.getRef(), schemaContext);
+ probeReg1.getRef(), schemaContext, getConfig());
final CompositeNode input = makeRPCInput("foo");
return invokeRpcMsg;
}
+
+ private RemoteRpcProviderConfig getConfig(){
+ return new RemoteRpcProviderConfig.Builder("unit-test").build();
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.testkit.TestActorRef;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+public class RemoteRpcProviderConfigTest {
+
+ @Test
+ public void testConfigDefaults() {
+
+ Config c = ConfigFactory.parseFile(new File("application.conf"));
+ RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test").build();
+
+ //Assert on configurations from common config
+ Assert.assertFalse(config.isMetricCaptureEnabled()); //should be disabled by default
+ Assert.assertNotNull(config.getMailBoxCapacity());
+ Assert.assertNotNull(config.getMailBoxName());
+ Assert.assertNotNull(config.getMailBoxPushTimeout());
+
+ //rest of the configurations should be set
+ Assert.assertNotNull(config.getActorSystemName());
+ Assert.assertNotNull(config.getRpcBrokerName());
+ Assert.assertNotNull(config.getRpcBrokerPath());
+ Assert.assertNotNull(config.getRpcManagerName());
+ Assert.assertNotNull(config.getRpcManagerPath());
+ Assert.assertNotNull(config.getRpcRegistryName());
+ Assert.assertNotNull(config.getRpcRegistryPath());
+ Assert.assertNotNull(config.getAskDuration());
+ Assert.assertNotNull(config.getGossipTickInterval());
+
+
+
+ }
+
+ @Test
+ public void testConfigCustomizations() {
+
+ AkkaConfigurationReader reader = new TestConfigReader();
+
+ final int expectedCapacity = 100;
+ String timeOutVal = "10ms";
+ FiniteDuration expectedTimeout = FiniteDuration.create(10, TimeUnit.MILLISECONDS);
+
+ RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test")
+ .metricCaptureEnabled(true)//enable metric capture
+ .mailboxCapacity(expectedCapacity)
+ .mailboxPushTimeout(timeOutVal)
+ .withConfigReader(reader)
+ .build();
+
+ Assert.assertTrue(config.isMetricCaptureEnabled());
+ Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+ Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+
+ //Now check this config inside an actor
+ ActorSystem system = ActorSystem.create("unit-test", config.get());
+ TestActorRef<ConfigTestActor> configTestActorTestActorRef =
+ TestActorRef.create(system, Props.create(ConfigTestActor.class));
+
+ ConfigTestActor actor = configTestActorTestActorRef.underlyingActor();
+ Config actorConfig = actor.getConfig();
+
+ config = new RemoteRpcProviderConfig(actorConfig);
+
+ Assert.assertTrue(config.isMetricCaptureEnabled());
+ Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+ Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+ }
+
+ public static class ConfigTestActor extends UntypedActor {
+
+ private Config actorSystemConfig;
+
+ public ConfigTestActor() {
+ this.actorSystemConfig = getContext().system().settings().config();
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ }
+
+ /**
+ * Only for testing. NEVER expose actor's internal state like this.
+ *
+ * @return
+ */
+ public Config getConfig() {
+ return actorSystemConfig;
+ }
+ }
+
+ public static class TestConfigReader implements AkkaConfigurationReader {
+
+ @Override
+ public Config read() {
+ return ConfigFactory.parseResources("application.conf");
+
+ }
+ }
+}
\ No newline at end of file
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
-import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.Config;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
public class RemoteRpcProviderTest {
static ActorSystem system;
-
+ static RemoteRpcProviderConfig moduleConfig;
@BeforeClass
public static void setup() throws InterruptedException {
- system = ActorSystem.create("odl-cluster-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
+ moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc").build();
+ Config config = moduleConfig.get();
+ system = ActorSystem.create("odl-cluster-rpc", config);
+
}
@AfterClass
SchemaService schemaService = mock(SchemaService.class);
when(schemaService.getGlobalContext()). thenReturn(mock(SchemaContext.class));
when(session.getService(SchemaService.class)).thenReturn(schemaService);
+
rpcProvider.onSessionInitiated(session);
- ActorRef actorRef = Await.result(system.actorSelection(ActorConstants.RPC_MANAGER_PATH).resolveOne(Duration.create(1, TimeUnit.SECONDS)),
- Duration.create(2, TimeUnit.SECONDS));
- Assert.assertTrue(actorRef.path().toString().contains(ActorConstants.RPC_MANAGER_PATH));
+
+ ActorRef actorRef = Await.result(
+ system.actorSelection(
+ moduleConfig.getRpcManagerPath()).resolveOne(Duration.create(1, TimeUnit.SECONDS)),
+ Duration.create(2, TimeUnit.SECONDS));
+
+ Assert.assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath()));
}
}
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import com.google.common.base.Predicate;
-import com.typesafe.config.ConfigFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
@BeforeClass
public static void setup() throws InterruptedException {
- node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
- node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
- node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC"));
+ RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+ RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+ RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build();
+ node1 = ActorSystem.create("opendaylight-rpc", config1.get());
+ node2 = ActorSystem.create("opendaylight-rpc", config2.get());
+ node3 = ActorSystem.create("opendaylight-rpc", config3.get());
}
@AfterClass
new ConditionalProbe(probe.getRef(), new Predicate() {
@Override
public boolean apply(@Nullable Object input) {
- return clazz.equals(input.getClass());
+ if (input != null)
+ return clazz.equals(input.getClass());
+ else
+ return false;
}
});
odl-cluster-rpc{
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10ms
}
}
unit-test{
akka {
- loglevel = "INFO"
+ loglevel = "DEBUG"
#loggers = ["akka.event.slf4j.Slf4jLogger"]
}
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+ #mailbox-capacity is specified in config subsystem
mailbox-capacity = 1000
mailbox-push-timeout-time = 10ms
}
memberA{
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10ms
}
}
memberB{
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10ms
}
}
memberC{
bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10ms
}