From: Abhishek Kumar Date: Wed, 27 Aug 2014 20:07:30 +0000 (-0700) Subject: Metrics and Configuration X-Git-Tag: release/helium~115 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=25b805c6685467f561506dbb5187a744fc12096b Metrics and Configuration 1. Adds a new abstract class AbstractMeteredUntypedActored that extends AbstractUntypedActor. This adds metrics capture capability which can be turned on using Config Subsystem. By default its turned off. 2. Updates Shard actor; adds metrics capture capability which can be turned on via Config Subsystem. 3. In remote-rpc-connector module, we can now pass configuration obtained from config subsystem to actor system so that its available to all actors. This obviates the need to manually pass the configuration via constructors or other *Constant or *Util classes. It brings all configuration items in the module at one place and makes it easier to move them to config subsystem, if its required. 4. In spirit of DRY, moves common code to clustering-commons 5. Minor code inspection fixes. 6. Makes mailbox-capacity configurable via config subsystem. Patch 9: Adds a new behaviour (MeteringBehavior.java). AbstractUntypedActorWithMetering and Shard actor exhibit this behavior current. This patch also refactors unified configuration (config subsystem + typesafe config) pieces. Note that in subsequent commits distributed-datastore will have its configuration added to actor system configuration. Also few more configuration items in remote-rpc-connector will go to config subsystem. I wanted to limit the amount of changes going into this already large commit. Change-Id: I383ec813c16ed09ed0e68ee59179f454c0d174cf Signed-off-by: Abhishek Kumar --- diff --git a/opendaylight/md-sal/sal-clustering-commons/pom.xml b/opendaylight/md-sal/sal-clustering-commons/pom.xml index a3619ec4d2..d12f867ac5 100644 --- a/opendaylight/md-sal/sal-clustering-commons/pom.xml +++ b/opendaylight/md-sal/sal-clustering-commons/pom.xml @@ -164,6 +164,14 @@ + + com.typesafe.akka + akka-osgi_${scala.version} + + + com.typesafe.akka + akka-actor_${scala.version} + com.google.guava guava diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractConfig.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractConfig.java new file mode 100644 index 0000000000..3a66aa1181 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractConfig.java @@ -0,0 +1,47 @@ +package org.opendaylight.controller.cluster.common.actor; + +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import java.util.HashMap; +import java.util.Map; + +public abstract class AbstractConfig implements UnifiedConfig { + + private Config config; + + public AbstractConfig(Config config){ + this.config = config; + } + + @Override + public Config get() { + return config; + } + + public static abstract class Builder{ + protected Map configHolder; + protected Config fallback; + + private final String actorSystemName; + + public Builder(String actorSystemName){ + Preconditions.checkArgument(actorSystemName != null, "Actor system name must not be null"); + this.actorSystemName = actorSystemName; + configHolder = new HashMap<>(); + } + + public T withConfigReader(AkkaConfigurationReader reader){ + fallback = reader.read().getConfig(actorSystemName); + return (T)this; + } + + protected Config merge(){ + if (fallback == null) + fallback = ConfigFactory.load().getConfig(actorSystemName); + + return ConfigFactory.parseMap(configHolder).withFallback(fallback); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java similarity index 81% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java rename to opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java index b258c4466a..ef56d02a2e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java @@ -6,31 +6,32 @@ * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.cluster.datastore; +package org.opendaylight.controller.cluster.common.actor; import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; -import org.opendaylight.controller.cluster.datastore.messages.Monitor; public abstract class AbstractUntypedActor extends UntypedActor { protected final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); - public AbstractUntypedActor() { LOG.debug("Actor created {}", getSelf()); getContext(). system(). actorSelection("user/termination-monitor"). tell(new Monitor(getSelf()), getSelf()); + } @Override public void onReceive(Object message) throws Exception { - LOG.debug("Received message {}", message.getClass().getSimpleName()); + final String messageType = message.getClass().getSimpleName(); + LOG.debug("Received message {}", messageType); + handleReceive(message); - LOG.debug("Done handling message {}", - message.getClass().getSimpleName()); + + LOG.debug("Done handling message {}", messageType); } protected abstract void handleReceive(Object message) throws Exception; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActorWithMetering.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActorWithMetering.java new file mode 100644 index 0000000000..5497f93c43 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActorWithMetering.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.common.actor; + +/** + * Actor with its behaviour metered. Metering is enabled by configuration. + */ +public abstract class AbstractUntypedActorWithMetering extends AbstractUntypedActor { + + public AbstractUntypedActorWithMetering() { + if (isMetricsCaptureEnabled()) + getContext().become(new MeteringBehavior(this)); + } + + private boolean isMetricsCaptureEnabled(){ + CommonConfig config = new CommonConfig(getContext().system().settings().config()); + return config.isMetricCaptureEnabled(); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/AkkaConfigurationReader.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AkkaConfigurationReader.java similarity index 87% rename from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/AkkaConfigurationReader.java rename to opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AkkaConfigurationReader.java index 035ce9a203..c2e212860b 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/AkkaConfigurationReader.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AkkaConfigurationReader.java @@ -6,7 +6,7 @@ * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.remote.rpc.utils; +package org.opendaylight.controller.cluster.common.actor; import com.typesafe.config.Config; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/CommonConfig.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/CommonConfig.java new file mode 100644 index 0000000000..0d139f9670 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/CommonConfig.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.common.actor; + + +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class CommonConfig extends AbstractConfig { + + protected static final String TAG_ACTOR_SYSTEM_NAME = "actor-system-name"; + protected static final String TAG_METRIC_CAPTURE_ENABLED = "metric-capture-enabled"; + protected static final String TAG_MAILBOX_CAPACITY = "mailbox-capacity"; + protected static final String TAG_MAILBOX = "bounded-mailbox"; + protected static final String TAG_MAILBOX_PUSH_TIMEOUT = "mailbox-push-timeout-time"; + + //TODO: Ideally these defaults should go to reference.conf + // https://bugs.opendaylight.org/show_bug.cgi?id=1709 + private static final int DEFAULT_MAILBOX_CAPACITY = 1000; + private static final int DEFAULT_MAILBOX_PUSH_TIMEOUT = 100; + + //locally cached values + private FiniteDuration cachedMailBoxPushTimeout; + private Integer cachedMailBoxCapacity; + private Boolean cachedMetricCaptureEnableFlag; + + public CommonConfig(Config config) { + super(config); + } + + public String getActorSystemName() { + return get().getString(TAG_ACTOR_SYSTEM_NAME); + } + + public boolean isMetricCaptureEnabled(){ + if (cachedMetricCaptureEnableFlag != null){ + return cachedMetricCaptureEnableFlag; + } + + cachedMetricCaptureEnableFlag = get().hasPath(TAG_METRIC_CAPTURE_ENABLED) + ? get().getBoolean(TAG_METRIC_CAPTURE_ENABLED) + : false; + + return cachedMetricCaptureEnableFlag; + } + + public String getMailBoxName() { + return TAG_MAILBOX; + } + + public Integer getMailBoxCapacity() { + + if (cachedMailBoxCapacity != null) { + return cachedMailBoxCapacity; + } + + final String PATH = new StringBuilder(TAG_MAILBOX).append(".").append(TAG_MAILBOX_CAPACITY).toString(); + cachedMailBoxCapacity = get().hasPath(PATH) + ? get().getInt(PATH) + : DEFAULT_MAILBOX_CAPACITY; + + return cachedMailBoxCapacity; + } + + public FiniteDuration getMailBoxPushTimeout() { + + if (cachedMailBoxPushTimeout != null) { + return cachedMailBoxPushTimeout; + } + + final String PATH = new StringBuilder(TAG_MAILBOX).append(".").append(TAG_MAILBOX_PUSH_TIMEOUT).toString(); + + long timeout = get().hasPath(PATH) + ? get().getDuration(PATH, TimeUnit.NANOSECONDS) + : DEFAULT_MAILBOX_PUSH_TIMEOUT; + + cachedMailBoxPushTimeout = new FiniteDuration(timeout, TimeUnit.NANOSECONDS); + return cachedMailBoxPushTimeout; + } + + public static class Builder extends AbstractConfig.Builder{ + + public Builder(String actorSystemName) { + super(actorSystemName); + + //actor system config + configHolder.put(TAG_ACTOR_SYSTEM_NAME, actorSystemName); + + //config for bounded mailbox + configHolder.put(TAG_MAILBOX, new HashMap()); + } + + public T metricCaptureEnabled(boolean enabled) { + configHolder.put(TAG_METRIC_CAPTURE_ENABLED, String.valueOf(enabled)); + return (T)this; + } + + public T mailboxCapacity(int capacity) { + Preconditions.checkArgument(capacity > 0, "mailbox capacity must be >0"); + + Map boundedMailbox = (Map) configHolder.get(TAG_MAILBOX); + boundedMailbox.put(TAG_MAILBOX_CAPACITY, capacity); + return (T)this; + } + + public T mailboxPushTimeout(String timeout){ + Duration pushTimeout = Duration.create(timeout); + Preconditions.checkArgument(pushTimeout.isFinite(), "invalid value for mailbox push timeout"); + + Map boundedMailbox = (Map) configHolder.get(TAG_MAILBOX); + boundedMailbox.put(TAG_MAILBOX_PUSH_TIMEOUT, timeout); + return (T)this; + } + + public CommonConfig build() { + return new CommonConfig(merge()); + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/DefaultAkkaConfigurationReader.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/DefaultAkkaConfigurationReader.java similarity index 93% rename from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/DefaultAkkaConfigurationReader.java rename to opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/DefaultAkkaConfigurationReader.java index a44d20ca39..2cdf15ad12 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/DefaultAkkaConfigurationReader.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/DefaultAkkaConfigurationReader.java @@ -6,7 +6,7 @@ * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.remote.rpc.utils; +package org.opendaylight.controller.cluster.common.actor; import com.google.common.base.Preconditions; import com.typesafe.config.Config; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailbox.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailbox.java new file mode 100644 index 0000000000..458f379f84 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailbox.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.common.actor; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.dispatch.BoundedDequeBasedMailbox; +import akka.dispatch.MailboxType; +import akka.dispatch.ProducesMessageQueue; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; +import com.typesafe.config.Config; +import org.opendaylight.controller.cluster.reporting.MetricsReporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue { + + private final Logger LOG = LoggerFactory.getLogger(MeteredBoundedMailbox.class); + + private MeteredMessageQueue queue; + private Integer capacity; + private FiniteDuration pushTimeOut; + private MetricRegistry registry; + + private final String QUEUE_SIZE = "q-size"; + + public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) { + + CommonConfig commonConfig = new CommonConfig(settings.config()); + this.capacity = commonConfig.getMailBoxCapacity(); + this.pushTimeOut = commonConfig.getMailBoxPushTimeout(); + + MetricsReporter reporter = MetricsReporter.getInstance(); + registry = reporter.getMetricsRegistry(); + } + + + @Override + public MeteredMessageQueue create(final scala.Option owner, scala.Option system) { + this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut); + monitorQueueSize(owner, this.queue); + return this.queue; + } + + private void monitorQueueSize(scala.Option owner, final MeteredMessageQueue monitoredQueue) { + if (owner.isEmpty()) { + return; //there's no actor to monitor + } + String actorName = owner.get().path().toStringWithoutAddress(); + String metricName = registry.name(actorName, QUEUE_SIZE); + + if (registry.getMetrics().containsKey(metricName)) + return; //already registered + + Gauge queueSize = getQueueSizeGuage(monitoredQueue); + registerQueueSizeMetric(metricName, queueSize); + } + + + public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue { + + public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) { + super(capacity, pushTimeOut); + } + } + + private Gauge getQueueSizeGuage(final MeteredMessageQueue monitoredQueue ){ + return new Gauge() { + @Override + public Integer getValue() { + return monitoredQueue.size(); + } + }; + } + + private void registerQueueSizeMetric(String metricName, Gauge metric){ + try { + registry.register(metricName,metric); + } catch (IllegalArgumentException e) { + LOG.warn("Unable to register queue size in metrics registry. Failed with exception {}. ", e); + } + } +} + diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteringBehavior.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteringBehavior.java new file mode 100644 index 0000000000..d67d413d09 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteringBehavior.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.common.actor; + +import akka.actor.UntypedActor; +import akka.japi.Procedure; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.reporting.MetricsReporter; + +/** + * Represents behaviour that can be exhibited by actors of type {@link akka.actor.UntypedActor} + *

+ * This behaviour meters actor's default behaviour. It captures 2 metrics: + *

    + *
  • message processing rate of actor's receive block
  • + *
  • message processing rate by message type
  • + *
+ * + * The information is reported to {@link org.opendaylight.controller.cluster.reporting.MetricsReporter} + */ +public class MeteringBehavior implements Procedure { + + private final UntypedActor meteredActor; + + private final MetricRegistry METRICREGISTRY = MetricsReporter.getInstance().getMetricsRegistry(); + private final String MSG_PROCESSING_RATE = "msg-rate"; + + private String actorName; + private Timer msgProcessingTimer; + + /** + * + * @param actor whose behaviour needs to be metered + */ + public MeteringBehavior(UntypedActor actor){ + Preconditions.checkArgument(actor != null, "actor must not be null"); + + this.meteredActor = actor; + actorName = meteredActor.getSelf().path().toStringWithoutAddress(); + final String msgProcessingTime = MetricRegistry.name(actorName, MSG_PROCESSING_RATE); + msgProcessingTimer = METRICREGISTRY.timer(msgProcessingTime); + } + + /** + * Uses 2 timers to measure message processing rate. One for overall message processing rate and + * another to measure rate by message type. The timers are re-used if they were previously created. + *

+ * {@link com.codahale.metrics.MetricRegistry} maintains a reservoir for different timers where + * collected timings are kept. It exposes various metrics for each timer based on collected + * data. Eg: count of messages, 99, 95, 50... percentiles, max, mean etc. + *

+ * These metrics are exposed as JMX bean. + * + * @see + * http://dropwizard.github.io/metrics/manual/core/#timers + * + * @param message + * @throws Exception + */ + @Override + public void apply(Object message) throws Exception { + final String messageType = message.getClass().getSimpleName(); + + final String msgProcessingTimeByMsgType = + MetricRegistry.name(actorName, MSG_PROCESSING_RATE, messageType); + + final Timer msgProcessingTimerByMsgType = METRICREGISTRY.timer(msgProcessingTimeByMsgType); + + //start timers + final Timer.Context context = msgProcessingTimer.time(); + final Timer.Context contextByMsgType = msgProcessingTimerByMsgType.time(); + + meteredActor.onReceive(message); + + //stop timers + contextByMsgType.stop(); + context.stop(); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/Monitor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/Monitor.java similarity index 90% rename from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/Monitor.java rename to opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/Monitor.java index 43a31ef7f2..b2a43c03d9 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/Monitor.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/Monitor.java @@ -6,7 +6,7 @@ * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.remote.rpc.messages; +package org.opendaylight.controller.cluster.common.actor; import akka.actor.ActorRef; @@ -14,7 +14,6 @@ public class Monitor { private final ActorRef actorRef; public Monitor(ActorRef actorRef){ - this.actorRef = actorRef; } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnifiedConfig.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnifiedConfig.java new file mode 100644 index 0000000000..62b6055f2e --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnifiedConfig.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.common.actor; + +import com.typesafe.config.Config; + +/** + * Represents a unified view of configuration. + *

+ * It merges configuration from: + *

    + *
  • Config subsystem
  • + *
  • Akka configuration files
  • + *
+ * + * Configurations defined in config subsystem takes precedence. + */ +public interface UnifiedConfig { + + /** + * Returns an immutable instance of unified configuration + * @return + */ + public Config get(); +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/reporting/MetricsReporter.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/reporting/MetricsReporter.java similarity index 90% rename from opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/reporting/MetricsReporter.java rename to opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/reporting/MetricsReporter.java index 5c3e11f8b8..0f227779dd 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/reporting/MetricsReporter.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/reporting/MetricsReporter.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.common.reporting; +package org.opendaylight.controller.cluster.reporting; import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; @@ -21,7 +21,7 @@ import com.codahale.metrics.MetricRegistry; public class MetricsReporter implements AutoCloseable{ private final MetricRegistry METRICS_REGISTRY = new MetricRegistry(); - private final String DOMAIN = "org.opendaylight.controller"; + private final String DOMAIN = "org.opendaylight.controller.actor.metric"; public final JmxReporter jmxReporter = JmxReporter.forRegistry(METRICS_REGISTRY).inDomain(DOMAIN).build(); diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java deleted file mode 100644 index c6d3625ac3..0000000000 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.common.actor; - -import akka.actor.ActorPath; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.dispatch.BoundedDequeBasedMailbox; -import akka.dispatch.MailboxType; -import akka.dispatch.ProducesMessageQueue; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.MetricRegistry; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; -import org.opendaylight.controller.common.reporting.MetricsReporter; -import scala.concurrent.duration.FiniteDuration; - -import java.util.concurrent.TimeUnit; - -public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue { - - private MeteredMessageQueue queue; - private Integer capacity; - private FiniteDuration pushTimeOut; - private ActorPath actorPath; - private MetricsReporter reporter; - - private final String QUEUE_SIZE = "queue-size"; - private final String CAPACITY = "mailbox-capacity"; - private final String TIMEOUT = "mailbox-push-timeout-time"; - private final Long DEFAULT_TIMEOUT = 10L; - - public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) { - Preconditions.checkArgument( config.hasPath("mailbox-capacity"), "Missing configuration [mailbox-capacity]" ); - this.capacity = config.getInt(CAPACITY); - Preconditions.checkArgument( this.capacity > 0, "mailbox-capacity must be > 0"); - - Long timeout = -1L; - if ( config.hasPath(TIMEOUT) ){ - timeout = config.getDuration(TIMEOUT, TimeUnit.NANOSECONDS); - } else { - timeout = DEFAULT_TIMEOUT; - } - Preconditions.checkArgument( timeout > 0, "mailbox-push-timeout-time must be > 0"); - this.pushTimeOut = new FiniteDuration(timeout, TimeUnit.NANOSECONDS); - - reporter = MetricsReporter.getInstance(); - } - - - @Override - public MeteredMessageQueue create(final scala.Option owner, scala.Option system) { - this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut); - monitorQueueSize(owner, this.queue); - return this.queue; - } - - private void monitorQueueSize(scala.Option owner, final MeteredMessageQueue monitoredQueue) { - if (owner.isEmpty()) { - return; //there's no actor to monitor - } - actorPath = owner.get().path(); - String actorInstanceId = Integer.toString(owner.get().hashCode()); - - MetricRegistry registry = reporter.getMetricsRegistry(); - String actorName = registry.name(actorPath.toString(), actorInstanceId, QUEUE_SIZE); - - if (registry.getMetrics().containsKey(actorName)) - return; //already registered - - registry.register(actorName, - new Gauge() { - @Override - public Integer getValue() { - return monitoredQueue.size(); - } - }); - } - - - public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue { - - public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) { - super(capacity, pushTimeOut); - } - } - -} - diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/CommonConfigTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/CommonConfigTest.java new file mode 100644 index 0000000000..cd77ab211e --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/CommonConfigTest.java @@ -0,0 +1,43 @@ +package org.opendaylight.controller.cluster.common.actor; + +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CommonConfigTest { + + @Test + public void testCommonConfigDefaults(){ + CommonConfig config = new CommonConfig.Builder<>("testsystem").build(); + + assertNotNull(config.getActorSystemName()); + assertNotNull(config.getMailBoxCapacity()); + assertNotNull(config.getMailBoxName()); + assertNotNull(config.getMailBoxPushTimeout()); + assertNotNull(config.isMetricCaptureEnabled()); + } + + @Test + public void testCommonConfigOverride(){ + + int expectedCapacity = 123; + String timeoutValue = "1000ms"; + CommonConfig config = new CommonConfig.Builder<>("testsystem") + .mailboxCapacity(expectedCapacity) + .mailboxPushTimeout(timeoutValue) + .metricCaptureEnabled(true) + .build(); + + assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue()); + + FiniteDuration expectedTimeout = FiniteDuration.create(1000, TimeUnit.MILLISECONDS); + assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis()); + + assertTrue(config.isMetricCaptureEnabled()); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/common/actor/MeteredBoundedMailboxTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailboxTest.java similarity index 88% rename from opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/common/actor/MeteredBoundedMailboxTest.java rename to opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailboxTest.java index 0f96c1002d..d33e79f533 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/common/actor/MeteredBoundedMailboxTest.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailboxTest.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.common.actor; +package org.opendaylight.controller.cluster.common.actor; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -25,11 +25,13 @@ import java.util.concurrent.locks.ReentrantLock; public class MeteredBoundedMailboxTest { private static ActorSystem actorSystem; + private static CommonConfig config; private final ReentrantLock lock = new ReentrantLock(); @Before public void setUp() throws Exception { - actorSystem = ActorSystem.create("testsystem"); + config = new CommonConfig.Builder<>("testsystem").build(); + actorSystem = ActorSystem.create("testsystem", config.get()); } @After @@ -39,15 +41,14 @@ public class MeteredBoundedMailboxTest { } @Test - public void test_WhenQueueIsFull_ShouldSendMsgToDeadLetter() throws InterruptedException { + public void shouldSendMsgToDeadLetterWhenQueueIsFull() throws InterruptedException { final JavaTestKit mockReceiver = new JavaTestKit(actorSystem); actorSystem.eventStream().subscribe(mockReceiver.getRef(), DeadLetter.class); final FiniteDuration TWENTY_SEC = new FiniteDuration(20, TimeUnit.SECONDS); - String boundedMailBox = actorSystem.name() + ".bounded-mailbox"; - ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(boundedMailBox), + ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(config.getMailBoxName()), "pingpongactor"); actorSystem.mailboxes().settings(); diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf index 0392dec3dd..a8a6513bb6 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf @@ -1,7 +1,7 @@ testsystem { bounded-mailbox { - mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" mailbox-capacity = 10 mailbox-push-timeout-time = 100ms } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf index 3481bae8ae..43696c7652 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf @@ -1,8 +1,5 @@ -testsystem { - bounded-mailbox { - mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" mailbox-capacity = 1000 mailbox-push-timeout-time = 10ms } -} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf index 72da6304e5..fbb666a9ca 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf @@ -56,6 +56,9 @@ dom:dom-broker-osgi-registry dom-broker + true + odl-cluster-rpc + 1000 diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf index 5a2116b50f..f632b9cc83 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf @@ -1,10 +1,13 @@ odl-cluster-data { bounded-mailbox { - mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" mailbox-capacity = 1000 mailbox-push-timeout-time = 100ms - } + } + + metric-capture-enabled = true + akka { actor { provider = "akka.cluster.ClusterActorRefProvider" @@ -44,10 +47,13 @@ odl-cluster-data { odl-cluster-rpc { bounded-mailbox { - mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" mailbox-capacity = 1000 mailbox-push-timeout-time = 100ms } + + metric-capture-enabled = true + akka { actor { provider = "akka.cluster.ClusterActorRefProvider" @@ -62,7 +68,7 @@ odl-cluster-rpc { } cluster { - seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@127.0.0.1:2551"] + seed-nodes = ["akka.tcp://odl-cluster-rpc@127.0.0.1:2551"] auto-down-unreachable-after = 10s } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java deleted file mode 100644 index b326d61fc6..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.datastore; - -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.osgi.BundleDelegatingClassLoader; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.osgi.framework.BundleContext; - -import java.io.File; - -public class ActorSystemFactory { - - public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf"; - public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data"; - public static final String CONFIGURATION_NAME = "odl-cluster-data"; - - private static volatile ActorSystem actorSystem = null; - - public static final ActorSystem getInstance(){ - return actorSystem; - } - - /** - * This method should be called only once during initialization - * - * @param bundleContext - */ - public static final ActorSystem createInstance(final BundleContext bundleContext) { - if(actorSystem == null) { - // Create an OSGi bundle classloader for actor system - BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(), - Thread.currentThread().getContextClassLoader()); - synchronized (ActorSystemFactory.class) { - // Double check - - if (actorSystem == null) { - ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME, - ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader); - system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); - actorSystem = system; - } - } - } - - return actorSystem; - } - - - private static final Config readAkkaConfiguration(){ - File defaultConfigFile = new File(AKKA_CONF_PATH); - Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing"); - return ConfigFactory.parseFile(defaultConfigFile); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java index b3283a18b1..f1c0df4c3a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java @@ -12,6 +12,7 @@ import akka.actor.Props; import akka.japi.Creator; import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.datastore.messages.DataChanged; import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java index 818f73392d..342611298c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.PoisonPill; import akka.actor.Props; import akka.japi.Creator; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java index 8739ed1966..a6f187d065 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java @@ -9,22 +9,60 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSystem; - +import akka.actor.Props; +import akka.osgi.BundleDelegatingClassLoader; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.osgi.framework.BundleContext; +import java.io.File; +import java.util.concurrent.atomic.AtomicReference; + public class DistributedDataStoreFactory { + + public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf"; + public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data"; + public static final String CONFIGURATION_NAME = "odl-cluster-data"; + private static AtomicReference actorSystem = new AtomicReference<>(); + public static DistributedDataStore createInstance(String name, SchemaService schemaService, - DatastoreContext datastoreContext, BundleContext bundleContext) { + DatastoreContext datastoreContext, BundleContext bundleContext) { - ActorSystem actorSystem = ActorSystemFactory.createInstance(bundleContext); + ActorSystem actorSystem = getOrCreateInstance(bundleContext); Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); final DistributedDataStore dataStore = - new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem), - config, datastoreContext ); + new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem), + config, datastoreContext); + ShardStrategyFactory.setConfiguration(config); schemaService.registerSchemaContextListener(dataStore); return dataStore; } + + synchronized private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext) { + + if (actorSystem.get() != null){ + return actorSystem.get(); + } + // Create an OSGi bundle classloader for actor system + BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(), + Thread.currentThread().getContextClassLoader()); + + ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME, + ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader); + system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); + + actorSystem.set(system); + return system; + } + + + private static final Config readAkkaConfiguration() { + File defaultConfigFile = new File(AKKA_CONF_PATH); + Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing"); + return ConfigFactory.parseFile(defaultConfigFile); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 7d570046d4..a1858f5f91 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -26,6 +26,8 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import org.opendaylight.controller.cluster.common.actor.CommonConfig; +import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; @@ -138,6 +140,9 @@ public class Shard extends RaftActor { shardMBean.setDataStoreExecutor(store.getDomStoreExecutor()); shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager()); + if (isMetricsCaptureEnabled()) { + getContext().become(new MeteringBehavior(this)); + } } private static Map mapPeerAddresses( @@ -406,7 +411,12 @@ public class Shard extends RaftActor { ActorRef transactionChain = getContext().actorOf( ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean)); getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(), - getSelf()); + getSelf()); + } + + private boolean isMetricsCaptureEnabled(){ + CommonConfig config = new CommonConfig(getContext().system().settings().config()); + return config.isMetricCaptureEnabled(); } @Override protected void applyState(ActorRef clientActor, String identifier, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 58cdefe537..13ecaa5619 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -18,6 +18,8 @@ import akka.cluster.ClusterEvent; import akka.japi.Creator; import akka.japi.Function; import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; + import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; @@ -48,7 +50,7 @@ import java.util.Map; *
  • Monitor the cluster members and store their addresses *
      */ -public class ShardManager extends AbstractUntypedActor { +public class ShardManager extends AbstractUntypedActorWithMetering { // Stores a mapping between a member name and the address of the member // Member names look like "member-1", "member-2" etc and are as specified diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index 65f865b0c4..b810ed9575 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -16,6 +16,8 @@ import akka.japi.Creator; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; + import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java index 484bd54a07..8fe94cf468 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Props; import akka.japi.Creator; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java index 2dce6a1079..e3ae5dac7b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java @@ -18,6 +18,7 @@ import akka.japi.Creator; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf index c29f93bb07..6be6cda5d3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf @@ -1,10 +1,13 @@ odl-cluster-data { bounded-mailbox { - mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" mailbox-capacity = 1000 mailbox-push-timeout-time = 100ms } + + metric-capture-enabled = true + akka { loggers = ["akka.event.slf4j.Slf4jLogger"] cluster { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index 82bc5e29bc..e19a76703f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -41,13 +41,13 @@ module distributed-datastore-provider { range "1..max"; } } - + typedef operation-timeout-type { type uint16 { range "5..max"; } } - + grouping data-store-properties { leaf max-shard-data-change-executor-queue-size { default 1000; @@ -72,20 +72,32 @@ module distributed-datastore-provider { type non-zero-uint16-type; description "The maximum queue size for each shard's data store executor."; } - + leaf shard-transaction-idle-timeout-in-minutes { default 10; type non-zero-uint16-type; description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs."; } - + leaf operation-timeout-in-seconds { default 5; type operation-timeout-type; description "The maximum amount of time for akka operations (remote or local) to complete before failing."; } + + leaf enable-metric-capture { + default false; + type boolean; + description "Enable or disable metric capture."; + } + + leaf bounded-mailbox-capacity { + default 1000; + type non-zero-uint16-type; + description "Max queue size that an actor's mailbox can reach"; + } } - + // Augments the 'configuration' choice node under modules/module. augment "/config:modules/config:module/config:configuration" { case distributed-config-datastore-provider { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index 6851b1b72c..794b376af8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -15,7 +15,7 @@ akka { } } bounded-mailbox { - mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" mailbox-capacity = 1000 mailbox-push-timeout-time = 100ms } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml index 8d454c4bd6..0fb468be86 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml +++ b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml @@ -43,6 +43,11 @@ com.typesafe.akka akka-slf4j_${scala.version} + + + com.typesafe.akka + akka-persistence-experimental_${scala.version} + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModule.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModule.java index 2be8ba47b9..c2e8125df2 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModule.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModule.java @@ -1,5 +1,7 @@ package org.opendaylight.controller.config.yang.config.remote_rpc_connector; +import org.opendaylight.controller.cluster.common.actor.DefaultAkkaConfigurationReader; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderFactory; import org.opendaylight.controller.sal.core.api.Broker; import org.osgi.framework.BundleContext; @@ -22,7 +24,14 @@ public class RemoteRPCBrokerModule extends org.opendaylight.controller.config.ya @Override public java.lang.AutoCloseable createInstance() { Broker broker = getDomBrokerDependency(); - return RemoteRpcProviderFactory.createInstance(broker, bundleContext); + + RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder(getActorSystemName()) + .metricCaptureEnabled(getEnableMetricCapture()) + .mailboxCapacity(getBoundedMailboxCapacity()) + .withConfigReader(new DefaultAkkaConfigurationReader()) + .build(); + + return RemoteRpcProviderFactory.createInstance(broker, bundleContext, config); } public void setBundleContext(BundleContext bundleContext) { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java deleted file mode 100644 index 66593ae2a5..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc; - -import akka.actor.UntypedActor; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import org.opendaylight.controller.remote.rpc.messages.Monitor; - -public abstract class AbstractUntypedActor extends UntypedActor { - protected final LoggingAdapter LOG = - Logging.getLogger(getContext().system(), this); - - - public AbstractUntypedActor(){ - LOG.debug("Actor created {}", getSelf()); - getContext(). - system(). - actorSelection("user/termination-monitor"). - tell(new Monitor(getSelf()), getSelf()); - } - - @Override public void onReceive(Object message) throws Exception { - LOG.debug("Received message {}", message); - handleReceive(message); - LOG.debug("Done handling message {}", message); - } - - protected abstract void handleReceive(Object message) throws Exception; -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java deleted file mode 100644 index da0d62897a..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc; - - -public class ActorConstants { - public static final String RPC_BROKER = "rpc-broker"; - public static final String RPC_REGISTRY = "rpc-registry"; - public static final String RPC_MANAGER = "rpc"; - - public static final String RPC_BROKER_PATH= "/user/rpc/rpc-broker"; - public static final String RPC_REGISTRY_PATH = "/user/rpc/rpc-registry"; - public static final String RPC_MANAGER_PATH = "/user/rpc"; -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java deleted file mode 100644 index 6a442c57cc..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc; - -import akka.actor.ActorSystem; -import akka.osgi.BundleDelegatingClassLoader; -import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader; -import org.osgi.framework.BundleContext; - - -public class ActorSystemFactory { - - public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-rpc"; - public static final String CONFIGURATION_NAME = "odl-cluster-rpc"; - - private static volatile ActorSystem actorSystem = null; - - public static final ActorSystem getInstance(){ - return actorSystem; - } - - /** - * This method should be called only once during initialization - * - * @param bundleContext - */ - public static final void createInstance(final BundleContext bundleContext, AkkaConfigurationReader akkaConfigurationReader) { - if(actorSystem == null) { - // Create an OSGi bundle classloader for actor system - BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(), - Thread.currentThread().getContextClassLoader()); - synchronized (ActorSystemFactory.class) { - // Double check - if (actorSystem == null) { - ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME, - akkaConfigurationReader.read().getConfig(CONFIGURATION_NAME), classLoader); - actorSystem = system; - } - } - } else { - throw new IllegalStateException("Actor system should be created only once. Use getInstance method to access existing actor system"); - } - } - -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java index 7d7dbf0f3a..0f84abb22e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java @@ -1,42 +1,40 @@ package org.opendaylight.controller.remote.rpc; -import static akka.pattern.Patterns.ask; import akka.actor.ActorRef; import akka.dispatch.OnComplete; -import akka.util.Timeout; - import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; - import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; -import org.opendaylight.controller.remote.rpc.utils.ActorUtil; -import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation; import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.concurrent.ExecutionContext; import java.util.Collections; import java.util.Set; +import static akka.pattern.Patterns.ask; + public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefaultImplementation { private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class); private final ActorRef rpcBroker; private final SchemaContext schemaContext; + private final RemoteRpcProviderConfig config; - public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) { + public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext, RemoteRpcProviderConfig config) { this.rpcBroker = rpcBroker; this.schemaContext = schemaContext; + this.config = config; } @Override @@ -63,8 +61,7 @@ public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefa final SettableFuture> listenableFuture = SettableFuture.create(); - scala.concurrent.Future future = ask(rpcBroker, rpcMsg, - new Timeout(ActorUtil.ASK_DURATION)); + scala.concurrent.Future future = ask(rpcBroker, rpcMsg, config.getAskDuration()); OnComplete onComplete = new OnComplete() { @Override diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java index d088f2284d..8b4ce31d2e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java @@ -31,21 +31,25 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class); - private final ActorSystem actorSystem; private final RpcProvisionRegistry rpcProvisionRegistry; + + private ActorSystem actorSystem; private Broker.ProviderSession brokerSession; private SchemaContext schemaContext; private ActorRef rpcManager; + private RemoteRpcProviderConfig config; public RemoteRpcProvider(ActorSystem actorSystem, RpcProvisionRegistry rpcProvisionRegistry) { this.actorSystem = actorSystem; this.rpcProvisionRegistry = rpcProvisionRegistry; + this.config = new RemoteRpcProviderConfig(actorSystem.settings().config()); } @Override public void close() throws Exception { - this.actorSystem.shutdown(); + if (this.actorSystem != null) + this.actorSystem.shutdown(); } @Override @@ -60,17 +64,17 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext } private void start() { - LOG.info("Starting all rpc listeners and actors."); - // Create actor to handle and sync routing table in cluster + LOG.info("Starting remote rpc service..."); + SchemaService schemaService = brokerSession.getService(SchemaService.class); schemaContext = schemaService.getGlobalContext(); - rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry), ActorConstants.RPC_MANAGER); + rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry), + config.getRpcManagerName()); - LOG.debug("Rpc actors are created."); + LOG.debug("rpc manager started"); } - @Override public void onGlobalContextUpdated(SchemaContext schemaContext) { this.schemaContext = schemaContext; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java new file mode 100644 index 0000000000..3f6d42d742 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import akka.util.Timeout; +import com.typesafe.config.Config; +import org.opendaylight.controller.cluster.common.actor.CommonConfig; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +/** + */ +public class RemoteRpcProviderConfig extends CommonConfig { + + protected static final String TAG_RPC_BROKER_NAME = "rpc-broker-name"; + protected static final String TAG_RPC_REGISTRY_NAME = "registry-name"; + protected static final String TAG_RPC_MGR_NAME = "rpc-manager-name"; + protected static final String TAG_RPC_BROKER_PATH = "rpc-broker-path"; + protected static final String TAG_RPC_REGISTRY_PATH = "rpc-registry-path"; + protected static final String TAG_RPC_MGR_PATH = "rpc-manager-path"; + protected static final String TAG_ASK_DURATION = "ask-duration"; + private static final String TAG_GOSSIP_TICK_INTERVAL = "gossip-tick-interval"; + + //locally cached values + private Timeout cachedAskDuration; + private FiniteDuration cachedGossipTickInterval; + + public RemoteRpcProviderConfig(Config config){ + super(config); + } + + public String getRpcBrokerName(){ + return get().getString(TAG_RPC_BROKER_NAME); + } + + public String getRpcRegistryName(){ + return get().getString(TAG_RPC_REGISTRY_NAME); + } + + public String getRpcManagerName(){ + return get().getString(TAG_RPC_MGR_NAME); + } + + public String getRpcBrokerPath(){ + return get().getString(TAG_RPC_BROKER_PATH); + } + + public String getRpcRegistryPath(){ + return get().getString(TAG_RPC_REGISTRY_PATH); + + } + + public String getRpcManagerPath(){ + return get().getString(TAG_RPC_MGR_PATH); + } + + + public Timeout getAskDuration(){ + if (cachedAskDuration != null){ + return cachedAskDuration; + } + + cachedAskDuration = new Timeout(new FiniteDuration( + get().getDuration(TAG_ASK_DURATION, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)); + + return cachedAskDuration; + } + + public FiniteDuration getGossipTickInterval(){ + if (cachedGossipTickInterval != null) { + return cachedGossipTickInterval; + } + + cachedGossipTickInterval = new FiniteDuration( + get().getDuration(TAG_GOSSIP_TICK_INTERVAL, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + + return cachedGossipTickInterval; + } + + public static class Builder extends CommonConfig.Builder{ + + public Builder(String actorSystemName){ + super(actorSystemName); + + //Actor names + configHolder.put(TAG_RPC_BROKER_NAME, "broker"); + configHolder.put(TAG_RPC_REGISTRY_NAME, "registry"); + configHolder.put(TAG_RPC_MGR_NAME, "rpc"); + + //Actor paths + configHolder.put(TAG_RPC_BROKER_PATH, "/user/rpc/broker"); + configHolder.put(TAG_RPC_REGISTRY_PATH, "/user/rpc/registry"); + configHolder.put(TAG_RPC_MGR_PATH, "/user/rpc"); + + //durations + configHolder.put(TAG_ASK_DURATION, "15s"); + configHolder.put(TAG_GOSSIP_TICK_INTERVAL, "500ms"); + + } + + public RemoteRpcProviderConfig build(){ + return new RemoteRpcProviderConfig(merge()); + } + } + + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java index 0e6b795c05..2e355d4f51 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java @@ -8,19 +8,43 @@ package org.opendaylight.controller.remote.rpc; - -import org.opendaylight.controller.remote.rpc.utils.DefaultAkkaConfigurationReader; +import akka.actor.ActorSystem; +import akka.osgi.BundleDelegatingClassLoader; +import com.typesafe.config.Config; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; import org.osgi.framework.BundleContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RemoteRpcProviderFactory { - public static RemoteRpcProvider createInstance(final Broker broker, final BundleContext bundleContext){ + private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProviderFactory.class); + + public static RemoteRpcProvider createInstance( + final Broker broker, final BundleContext bundleContext, final RemoteRpcProviderConfig config){ - ActorSystemFactory.createInstance(bundleContext, new DefaultAkkaConfigurationReader()); RemoteRpcProvider rpcProvider = - new RemoteRpcProvider(ActorSystemFactory.getInstance(), (RpcProvisionRegistry) broker); + new RemoteRpcProvider(createActorSystem(bundleContext, config), (RpcProvisionRegistry) broker); + broker.registerProvider(rpcProvider); return rpcProvider; } + + private static ActorSystem createActorSystem(BundleContext bundleContext, RemoteRpcProviderConfig config){ + + // Create an OSGi bundle classloader for actor system + BundleDelegatingClassLoader classLoader = + new BundleDelegatingClassLoader(bundleContext.getBundle(), + Thread.currentThread().getContextClassLoader()); + + Config actorSystemConfig = config.get(); + LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render()); + + if (config.isMetricCaptureEnabled()) { + LOG.info("Instrumentation is enabled in actor system {}. Metrics can be viewed in JMX console.", + config.getActorSystemName()); + } + + return ActorSystem.create(config.getActorSystemName(), actorSystemConfig, classLoader); + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java index 2aca655d26..6b02235dc7 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java @@ -8,25 +8,26 @@ package org.opendaylight.controller.remote.rpc; -import static akka.pattern.Patterns.ask; import akka.actor.ActorRef; import akka.actor.Props; import akka.dispatch.OnComplete; import akka.japi.Creator; import akka.japi.Pair; -import akka.util.Timeout; - +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; -import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; -import org.opendaylight.controller.remote.rpc.utils.ActorUtil; +import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic; import org.opendaylight.controller.remote.rpc.utils.RoutingLogic; -import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; +import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; @@ -36,16 +37,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; -import com.google.common.util.concurrent.ListenableFuture; - import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.Future; +import static akka.pattern.Patterns.ask; + /** * Actor to initiate execution of remote RPC on other nodes of the cluster. */ @@ -56,12 +54,14 @@ public class RpcBroker extends AbstractUntypedActor { private final Broker.ProviderSession brokerSession; private final ActorRef rpcRegistry; private final SchemaContext schemaContext; + private final RemoteRpcProviderConfig config; private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, SchemaContext schemaContext) { this.brokerSession = brokerSession; this.rpcRegistry = rpcRegistry; this.schemaContext = schemaContext; + config = new RemoteRpcProviderConfig(getContext().system().settings().config()); } public static Props props(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, @@ -85,8 +85,7 @@ public class RpcBroker extends AbstractUntypedActor { null, msg.getRpc(), msg.getIdentifier()); RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId); - scala.concurrent.Future future = ask(rpcRegistry, findMsg, - new Timeout(ActorUtil.LOCAL_ASK_DURATION)); + scala.concurrent.Future future = ask(rpcRegistry, findMsg, config.getAskDuration()); final ActorRef sender = getSender(); final ActorRef self = self(); @@ -129,8 +128,7 @@ public class RpcBroker extends AbstractUntypedActor { ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc()); - scala.concurrent.Future future = ask(logic.select(), executeMsg, - new Timeout(ActorUtil.REMOTE_ASK_DURATION)); + scala.concurrent.Future future = ask(logic.select(), executeMsg, config.getAskDuration()); OnComplete onComplete = new OnComplete() { @Override diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java index d4da226b9d..4ae9c2e4d0 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java @@ -15,11 +15,9 @@ import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.japi.Creator; import akka.japi.Function; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; -import org.opendaylight.controller.remote.rpc.utils.ActorUtil; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; import org.opendaylight.yangtools.yang.common.QName; @@ -27,6 +25,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; + import java.util.Set; /** @@ -43,16 +42,19 @@ public class RpcManager extends AbstractUntypedActor { private ActorRef rpcBroker; private ActorRef rpcRegistry; private final Broker.ProviderSession brokerSession; + private final RemoteRpcProviderConfig config; private RpcListener rpcListener; private RoutedRpcListener routeChangeListener; private RemoteRpcImplementation rpcImplementation; private final RpcProvisionRegistry rpcProvisionRegistry; private RpcManager(SchemaContext schemaContext, - Broker.ProviderSession brokerSession, RpcProvisionRegistry rpcProvisionRegistry) { + Broker.ProviderSession brokerSession, + RpcProvisionRegistry rpcProvisionRegistry) { this.schemaContext = schemaContext; this.brokerSession = brokerSession; this.rpcProvisionRegistry = rpcProvisionRegistry; + this.config = new RemoteRpcProviderConfig(getContext().system().settings().config()); createRpcActors(); startListeners(); @@ -60,7 +62,8 @@ public class RpcManager extends AbstractUntypedActor { public static Props props(final SchemaContext schemaContext, - final Broker.ProviderSession brokerSession, final RpcProvisionRegistry rpcProvisionRegistry) { + final Broker.ProviderSession brokerSession, + final RpcProvisionRegistry rpcProvisionRegistry) { return Props.create(new Creator() { @Override public RpcManager create() throws Exception { @@ -72,15 +75,13 @@ public class RpcManager extends AbstractUntypedActor { private void createRpcActors() { LOG.debug("Create rpc registry and broker actors"); - Config conf = ConfigFactory.load(); - rpcRegistry = getContext().actorOf(Props.create(RpcRegistry.class). - withMailbox(ActorUtil.MAILBOX), ActorConstants.RPC_REGISTRY); + withMailbox(config.getMailBoxName()), config.getRpcRegistryName()); rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext). - withMailbox(ActorUtil.MAILBOX),ActorConstants.RPC_BROKER); + withMailbox(config.getMailBoxName()), config.getRpcBrokerName()); RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker); rpcRegistry.tell(localRouter, self()); @@ -91,7 +92,7 @@ public class RpcManager extends AbstractUntypedActor { rpcListener = new RpcListener(rpcRegistry); routeChangeListener = new RoutedRpcListener(rpcRegistry); - rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext); + rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config); brokerSession.addRpcRegistrationListener(rpcListener); rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java index a90f1e1ed2..abe2008c29 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java @@ -12,7 +12,7 @@ import akka.actor.Terminated; import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; -import org.opendaylight.controller.remote.rpc.messages.Monitor; +import org.opendaylight.controller.cluster.common.actor.Monitor; public class TerminationMonitor extends UntypedActor{ protected final LoggingAdapter LOG = diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 5109d31644..095d70926b 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.dispatch.Mapper; import akka.event.Logging; import akka.event.LoggingAdapter; @@ -18,9 +17,10 @@ import akka.japi.Option; import akka.japi.Pair; import akka.pattern.Patterns; import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; -import org.opendaylight.controller.remote.rpc.utils.ActorUtil; import org.opendaylight.controller.sal.connector.api.RpcRouter; import scala.concurrent.Future; @@ -30,9 +30,9 @@ import java.util.List; import java.util.Map; import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket; @@ -45,7 +45,7 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this * cluster wide information. */ -public class RpcRegistry extends UntypedActor { +public class RpcRegistry extends AbstractUntypedActorWithMetering { final LoggingAdapter log = Logging.getLogger(getContext().system(), this); @@ -59,9 +59,11 @@ public class RpcRegistry extends UntypedActor { */ private ActorRef localRouter; + private RemoteRpcProviderConfig config; + public RpcRegistry() { bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store"); - + this.config = new RemoteRpcProviderConfig(getContext().system().settings().config()); log.info("Bucket store path = {}", bucketStore.path().toString()); } @@ -69,11 +71,9 @@ public class RpcRegistry extends UntypedActor { this.bucketStore = bucketStore; } - @Override - public void onReceive(Object message) throws Exception { - - log.debug("Received message: message [{}]", message); + @Override + protected void handleReceive(Object message) throws Exception { //TODO: if sender is remote, reject message if (message instanceof SetLocalRouter) @@ -108,7 +108,7 @@ public class RpcRegistry extends UntypedActor { Preconditions.checkState(localRouter != null, "Router must be set first"); - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis()); + Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration()); futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); } @@ -117,7 +117,7 @@ public class RpcRegistry extends UntypedActor { */ private void receiveRemoveRoutes(RemoveRoutes msg) { - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis()); + Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration()); futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); } @@ -130,7 +130,7 @@ public class RpcRegistry extends UntypedActor { private void receiveGetRouter(FindRouters msg) { final ActorRef sender = getSender(); - Future futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis()); + Future futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration()); futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher()); } @@ -151,7 +151,8 @@ public class RpcRegistry extends UntypedActor { * @param routeId * @return */ - private Messages.FindRoutersReply createReplyWithRouters(Map buckets, RpcRouter.RouteIdentifier routeId) { + private Messages.FindRoutersReply createReplyWithRouters( + Map buckets, RpcRouter.RouteIdentifier routeId) { List> routers = new ArrayList<>(); Option> routerWithUpdateTime = null; @@ -184,7 +185,8 @@ public class RpcRegistry extends UntypedActor { * @param sender client who asked to find the routers. * @return */ - private Mapper getMapperToGetRouter(final RpcRouter.RouteIdentifier routeId, final ActorRef sender) { + private Mapper getMapperToGetRouter( + final RpcRouter.RouteIdentifier routeId, final ActorRef sender) { return new Mapper() { @Override public Void apply(Object replyMessage) { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java index 3cdd924e85..4dac456dc4 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java @@ -11,7 +11,7 @@ import java.io.Serializable; public class BucketImpl> implements Bucket, Serializable { - private Long version = System.currentTimeMillis();; + private Long version = System.currentTimeMillis(); private T data; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index ff51f4fcfa..3de3fc00d0 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -12,11 +12,11 @@ import akka.actor.ActorRef; import akka.actor.ActorRefProvider; import akka.actor.Address; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.cluster.ClusterActorRefProvider; import akka.event.Logging; import akka.event.LoggingAdapter; -import org.opendaylight.controller.remote.rpc.utils.ActorUtil; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.utils.ConditionalProbe; import java.util.HashMap; @@ -45,14 +45,14 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}. * */ -public class BucketStore extends UntypedActor { +public class BucketStore extends AbstractUntypedActorWithMetering { final LoggingAdapter log = Logging.getLogger(getContext().system(), this); /** * Bucket owned by the node */ - private BucketImpl localBucket = new BucketImpl();; + private BucketImpl localBucket = new BucketImpl(); /** * Buckets ownded by other known nodes in the cluster @@ -71,20 +71,24 @@ public class BucketStore extends UntypedActor { private ConditionalProbe probe; + private final RemoteRpcProviderConfig config; + + public BucketStore(){ + config = new RemoteRpcProviderConfig(getContext().system().settings().config()); + } + @Override public void preStart(){ ActorRefProvider provider = getContext().provider(); selfAddress = provider.getDefaultAddress(); if ( provider instanceof ClusterActorRefProvider) - getContext().actorOf(Props.create(Gossiper.class).withMailbox(ActorUtil.MAILBOX), "gossiper"); + getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper"); } - @Override - public void onReceive(Object message) throws Exception { - - log.debug("Received message: node[{}], message[{}]", selfAddress, message); + @Override + protected void handleReceive(Object message) throws Exception { if (probe != null) { probe.tell(message, getSelf()); } @@ -100,17 +104,16 @@ public class BucketStore extends UntypedActor { receiveGetLocalBucket(); } else if (message instanceof GetBucketsByMembers) { receiveGetBucketsByMembers( - ((GetBucketsByMembers) message).getMembers()); + ((GetBucketsByMembers) message).getMembers()); } else if (message instanceof GetBucketVersions) { receiveGetBucketVersions(); } else if (message instanceof UpdateRemoteBuckets) { receiveUpdateRemoteBuckets( - ((UpdateRemoteBuckets) message).getBuckets()); + ((UpdateRemoteBuckets) message).getBuckets()); } else { log.debug("Unhandled message [{}]", message); unhandled(message); } - } /** diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index f6ce5e55f3..85c6ebe26f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -12,7 +12,6 @@ import akka.actor.ActorRefProvider; import akka.actor.ActorSelection; import akka.actor.Address; import akka.actor.Cancellable; -import akka.actor.UntypedActor; import akka.cluster.Cluster; import akka.cluster.ClusterActorRefProvider; import akka.cluster.ClusterEvent; @@ -21,7 +20,8 @@ import akka.dispatch.Mapper; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.pattern.Patterns; -import org.opendaylight.controller.remote.rpc.utils.ActorUtil; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -59,7 +59,7 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Go * */ -public class Gossiper extends UntypedActor { +public class Gossiper extends AbstractUntypedActorWithMetering { final LoggingAdapter log = Logging.getLogger(getContext().system(), this); @@ -79,7 +79,11 @@ public class Gossiper extends UntypedActor { private Boolean autoStartGossipTicks = true; - public Gossiper(){} + private RemoteRpcProviderConfig config; + + public Gossiper(){ + config = new RemoteRpcProviderConfig(getContext().system().settings().config()); + } /** * Helpful for testing @@ -106,7 +110,7 @@ public class Gossiper extends UntypedActor { if (autoStartGossipTicks) { gossipTask = getContext().system().scheduler().schedule( new FiniteDuration(1, TimeUnit.SECONDS), //initial delay - ActorUtil.GOSSIP_TICK_INTERVAL, //interval + config.getGossipTickInterval(), //interval getSelf(), //target new Messages.GossiperMessages.GossipTick(), //message getContext().dispatcher(), //execution context @@ -124,22 +128,19 @@ public class Gossiper extends UntypedActor { } @Override - public void onReceive(Object message) throws Exception { - - log.debug("Received message: node[{}], message[{}]", selfAddress, message); - + protected void handleReceive(Object message) throws Exception { //Usually sent by self via gossip task defined above. But its not enforced. //These ticks can be sent by another actor as well which is esp. useful while testing if (message instanceof GossipTick) receiveGossipTick(); - //Message from remote gossiper with its bucket versions + //Message from remote gossiper with its bucket versions else if (message instanceof GossipStatus) receiveGossipStatus((GossipStatus) message); - //Message from remote gossiper with buckets. This is usually in response to GossipStatus message - //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus - //message with its local versions + //Message from remote gossiper with buckets. This is usually in response to GossipStatus message + //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus + //message with its local versions else if (message instanceof GossipEnvelope) receiveGossip((GossipEnvelope) message); @@ -196,7 +197,7 @@ public class Gossiper extends UntypedActor { void receiveGossipTick(){ if (clusterMembers.size() == 0) return; //no members to send gossip status to - Address remoteMemberToGossipTo = null; + Address remoteMemberToGossipTo; if (clusterMembers.size() == 1) remoteMemberToGossipTo = clusterMembers.get(0); @@ -229,7 +230,7 @@ public class Gossiper extends UntypedActor { final ActorRef sender = getSender(); Future futureReply = - Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis()); + Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration()); futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher()); @@ -271,7 +272,7 @@ public class Gossiper extends UntypedActor { void sendGossipTo(final ActorRef remote, final Set
      addresses){ Future futureReply = - Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), ActorUtil.ASK_DURATION.toMillis()); + Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration()); futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher()); } @@ -284,7 +285,7 @@ public class Gossiper extends UntypedActor { //Get local status from bucket store and send to remote Future futureReply = - Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis()); + Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration()); //Find gossiper on remote system ActorSelection remoteRef = getContext().system().actorSelection( @@ -382,8 +383,6 @@ public class Gossiper extends UntypedActor { localIsOlder.add(address); else if (localVersions.get(address) > remoteVersions.get(address)) localIsNewer.add(address); - else - continue; } if (!localIsOlder.isEmpty()) diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java deleted file mode 100644 index e2baffa1b1..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - * - */ -package org.opendaylight.controller.remote.rpc.utils; - -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; - -import java.util.concurrent.TimeUnit; - -public class ActorUtil { - public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS); - public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS); - public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS); - public static final FiniteDuration GOSSIP_TICK_INTERVAL = Duration.create(500, TimeUnit.MILLISECONDS); - public static final String MAILBOX = "bounded-mailbox"; -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf b/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf index 266832a0ab..39ac991274 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf @@ -39,7 +39,7 @@ odl-cluster-data { odl-cluster-rpc { bounded-mailbox { - mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" mailbox-capacity = 1000 mailbox-push-timeout-time = 100ms } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/yang/remote-rpc-connector.yang b/opendaylight/md-sal/sal-remoterpc-connector/src/main/yang/remote-rpc-connector.yang index 08db5c0043..334d872c44 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/yang/remote-rpc-connector.yang +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/yang/remote-rpc-connector.yang @@ -5,11 +5,11 @@ module remote-rpc-connector { import config { prefix config; revision-date 2013-04-05; } import opendaylight-md-sal-dom {prefix dom;} - + description "This module contains the base YANG definitions for the remote routed rpc"; - + revision "2014-07-07" { description "Initial revision"; @@ -25,7 +25,7 @@ module remote-rpc-connector { augment "/config:modules/config:module/config:configuration" { case remote-rpc-connector { when "/config:modules/config:module/config:type = 'remote-rpc-connector'"; - + container dom-broker { uses config:service-ref { refine type { @@ -34,6 +34,24 @@ module remote-rpc-connector { } } } + + leaf enable-metric-capture { + default false; + type boolean; + description "Enable or disable metric capture."; + } + + leaf actor-system-name { + default odl-cluster-rpc; + type string; + description "Name by which actor system is identified. Its also used to find relevant configuration"; + } + + leaf bounded-mailbox-capacity { + default 1000; + type uint16; + description "Max queue size that an actor's mailbox can reach"; + } } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java index 8d886829aa..46406fd4fe 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java @@ -8,17 +8,10 @@ package org.opendaylight.controller.remote.rpc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.net.URI; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import com.google.common.collect.ImmutableList; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -26,9 +19,9 @@ import org.mockito.Mockito; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.Node; import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; @@ -36,12 +29,16 @@ import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; +import java.io.File; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; -import com.google.common.collect.ImmutableList; -import com.typesafe.config.ConfigFactory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; /** * Base class for RPC tests. @@ -70,8 +67,10 @@ public class AbstractRpcTest { @BeforeClass public static void setup() throws InterruptedException { - node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA")); - node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB")); + RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build(); + RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build(); + node1 = ActorSystem.create("opendaylight-rpc", config1.get()); + node2 = ActorSystem.create("opendaylight-rpc", config2.get()); } @AfterClass diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java deleted file mode 100644 index cd1cd91869..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc; - - -import akka.actor.ActorSystem; -import com.typesafe.config.ConfigFactory; -import junit.framework.Assert; -import org.junit.After; -import org.junit.Test; -import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader; -import org.osgi.framework.Bundle; -import org.osgi.framework.BundleContext; - -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class ActorSystemFactoryTest { - ActorSystem system = null; - - @Test - public void testActorSystemCreation(){ - BundleContext context = mock(BundleContext.class); - when(context.getBundle()).thenReturn(mock(Bundle.class)); - - AkkaConfigurationReader reader = mock(AkkaConfigurationReader.class); - when(reader.read()).thenReturn(ConfigFactory.load()); - - ActorSystemFactory.createInstance(context, reader); - system = ActorSystemFactory.getInstance(); - Assert.assertNotNull(system); - // Check illegal state exception - - try { - ActorSystemFactory.createInstance(context, reader); - fail("Illegal State exception should be thrown, while creating actor system second time"); - } catch (IllegalStateException e) { - } - } - - @After - public void cleanup() throws InterruptedException { - if(system != null) { - system.shutdown(); - } - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java index 6c3a57b344..49451dd0db 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java @@ -8,27 +8,26 @@ package org.opendaylight.controller.remote.rpc; -import static org.junit.Assert.assertEquals; -import java.net.URI; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - +import akka.testkit.JavaTestKit; +import com.google.common.util.concurrent.ListenableFuture; import org.junit.Test; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import akka.testkit.JavaTestKit; +import java.net.URI; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; -import com.google.common.util.concurrent.ListenableFuture; +import static org.junit.Assert.assertEquals; /*** * Unit tests for RemoteRpcImplementation. @@ -42,7 +41,7 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest { final AtomicReference assertError = new AtomicReference<>(); try { RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation( - probeReg1.getRef(), schemaContext); + probeReg1.getRef(), schemaContext, getConfig()); final CompositeNode input = makeRPCInput("foo"); final CompositeNode output = makeRPCOutput("bar"); @@ -68,7 +67,7 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest { final AtomicReference assertError = new AtomicReference<>(); try { RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation( - probeReg1.getRef(), schemaContext); + probeReg1.getRef(), schemaContext, getConfig()); QName instanceQName = new QName(new URI("ns"), "instance"); YangInstanceIdentifier identifier = YangInstanceIdentifier.of(instanceQName); @@ -99,7 +98,7 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest { final AtomicReference assertError = new AtomicReference<>(); try { RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation( - probeReg1.getRef(), schemaContext); + probeReg1.getRef(), schemaContext, getConfig()); final CompositeNode input = makeRPCInput("foo"); @@ -125,7 +124,7 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest { final AtomicReference assertError = new AtomicReference<>(); try { RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation( - probeReg1.getRef(), schemaContext); + probeReg1.getRef(), schemaContext, getConfig()); final CompositeNode input = makeRPCInput("foo"); @@ -182,4 +181,8 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest { return invokeRpcMsg; } + + private RemoteRpcProviderConfig getConfig(){ + return new RemoteRpcProviderConfig.Builder("unit-test").build(); + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfigTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfigTest.java new file mode 100644 index 0000000000..ae75252368 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfigTest.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.testkit.TestActorRef; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.junit.Assert; +import org.junit.Test; +import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +public class RemoteRpcProviderConfigTest { + + @Test + public void testConfigDefaults() { + + Config c = ConfigFactory.parseFile(new File("application.conf")); + RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test").build(); + + //Assert on configurations from common config + Assert.assertFalse(config.isMetricCaptureEnabled()); //should be disabled by default + Assert.assertNotNull(config.getMailBoxCapacity()); + Assert.assertNotNull(config.getMailBoxName()); + Assert.assertNotNull(config.getMailBoxPushTimeout()); + + //rest of the configurations should be set + Assert.assertNotNull(config.getActorSystemName()); + Assert.assertNotNull(config.getRpcBrokerName()); + Assert.assertNotNull(config.getRpcBrokerPath()); + Assert.assertNotNull(config.getRpcManagerName()); + Assert.assertNotNull(config.getRpcManagerPath()); + Assert.assertNotNull(config.getRpcRegistryName()); + Assert.assertNotNull(config.getRpcRegistryPath()); + Assert.assertNotNull(config.getAskDuration()); + Assert.assertNotNull(config.getGossipTickInterval()); + + + + } + + @Test + public void testConfigCustomizations() { + + AkkaConfigurationReader reader = new TestConfigReader(); + + final int expectedCapacity = 100; + String timeOutVal = "10ms"; + FiniteDuration expectedTimeout = FiniteDuration.create(10, TimeUnit.MILLISECONDS); + + RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test") + .metricCaptureEnabled(true)//enable metric capture + .mailboxCapacity(expectedCapacity) + .mailboxPushTimeout(timeOutVal) + .withConfigReader(reader) + .build(); + + Assert.assertTrue(config.isMetricCaptureEnabled()); + Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue()); + Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis()); + + //Now check this config inside an actor + ActorSystem system = ActorSystem.create("unit-test", config.get()); + TestActorRef configTestActorTestActorRef = + TestActorRef.create(system, Props.create(ConfigTestActor.class)); + + ConfigTestActor actor = configTestActorTestActorRef.underlyingActor(); + Config actorConfig = actor.getConfig(); + + config = new RemoteRpcProviderConfig(actorConfig); + + Assert.assertTrue(config.isMetricCaptureEnabled()); + Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue()); + Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis()); + } + + public static class ConfigTestActor extends UntypedActor { + + private Config actorSystemConfig; + + public ConfigTestActor() { + this.actorSystemConfig = getContext().system().settings().config(); + } + + @Override + public void onReceive(Object message) throws Exception { + } + + /** + * Only for testing. NEVER expose actor's internal state like this. + * + * @return + */ + public Config getConfig() { + return actorSystemConfig; + } + } + + public static class TestConfigReader implements AkkaConfigurationReader { + + @Override + public Config read() { + return ConfigFactory.parseResources("application.conf"); + + } + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java index 8a7e4a0398..8b4599ca8c 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java @@ -13,7 +13,7 @@ package org.opendaylight.controller.remote.rpc; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; -import com.typesafe.config.ConfigFactory; +import com.typesafe.config.Config; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -33,11 +33,14 @@ import static org.mockito.Mockito.when; public class RemoteRpcProviderTest { static ActorSystem system; - + static RemoteRpcProviderConfig moduleConfig; @BeforeClass public static void setup() throws InterruptedException { - system = ActorSystem.create("odl-cluster-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc")); + moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc").build(); + Config config = moduleConfig.get(); + system = ActorSystem.create("odl-cluster-rpc", config); + } @AfterClass @@ -53,9 +56,14 @@ public class RemoteRpcProviderTest { SchemaService schemaService = mock(SchemaService.class); when(schemaService.getGlobalContext()). thenReturn(mock(SchemaContext.class)); when(session.getService(SchemaService.class)).thenReturn(schemaService); + rpcProvider.onSessionInitiated(session); - ActorRef actorRef = Await.result(system.actorSelection(ActorConstants.RPC_MANAGER_PATH).resolveOne(Duration.create(1, TimeUnit.SECONDS)), - Duration.create(2, TimeUnit.SECONDS)); - Assert.assertTrue(actorRef.path().toString().contains(ActorConstants.RPC_MANAGER_PATH)); + + ActorRef actorRef = Await.result( + system.actorSelection( + moduleConfig.getRpcManagerPath()).resolveOne(Duration.create(1, TimeUnit.SECONDS)), + Duration.create(2, TimeUnit.SECONDS)); + + Assert.assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath())); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java index 83f52930b2..f6f720eed0 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -9,12 +9,12 @@ import akka.actor.ChildActorPath; import akka.actor.Props; import akka.testkit.JavaTestKit; import com.google.common.base.Predicate; -import com.typesafe.config.ConfigFactory; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages; import org.opendaylight.controller.sal.connector.api.RpcRouter; @@ -45,9 +45,12 @@ public class RpcRegistryTest { @BeforeClass public static void setup() throws InterruptedException { - node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA")); - node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB")); - node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC")); + RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build(); + RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build(); + RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build(); + node1 = ActorSystem.create("opendaylight-rpc", config1.get()); + node2 = ActorSystem.create("opendaylight-rpc", config2.get()); + node3 = ActorSystem.create("opendaylight-rpc", config3.get()); } @AfterClass @@ -204,7 +207,10 @@ public class RpcRegistryTest { new ConditionalProbe(probe.getRef(), new Predicate() { @Override public boolean apply(@Nullable Object input) { - return clazz.equals(input.getClass()); + if (input != null) + return clazz.equals(input.getClass()); + else + return false; } }); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf index 5c4af8d3da..8e310815fa 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf @@ -1,6 +1,6 @@ odl-cluster-rpc{ bounded-mailbox { - mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" mailbox-capacity = 1000 mailbox-push-timeout-time = 10ms } @@ -37,11 +37,12 @@ odl-cluster-rpc{ } unit-test{ akka { - loglevel = "INFO" + loglevel = "DEBUG" #loggers = ["akka.event.slf4j.Slf4jLogger"] } bounded-mailbox { - mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" + #mailbox-capacity is specified in config subsystem mailbox-capacity = 1000 mailbox-push-timeout-time = 10ms } @@ -49,7 +50,7 @@ unit-test{ memberA{ bounded-mailbox { - mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" mailbox-capacity = 1000 mailbox-push-timeout-time = 10ms } @@ -82,7 +83,7 @@ memberA{ } memberB{ bounded-mailbox { - mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" mailbox-capacity = 1000 mailbox-push-timeout-time = 10ms } @@ -116,7 +117,7 @@ memberB{ } memberC{ bounded-mailbox { - mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" mailbox-capacity = 1000 mailbox-push-timeout-time = 10ms }