Metrics and Configuration 65/10665/15
authorAbhishek Kumar <abhishk2@cisco.com>
Wed, 27 Aug 2014 20:07:30 +0000 (13:07 -0700)
committerAbhishek Kumar <abhishk2@cisco.com>
Tue, 9 Sep 2014 20:18:11 +0000 (20:18 +0000)
1. Adds a new abstract class AbstractMeteredUntypedActored that
   extends AbstractUntypedActor. This adds metrics capture
   capability which can be turned on using Config Subsystem.
   By default its turned off.

2. Updates Shard actor; adds metrics capture capability which can be
   turned on via Config Subsystem.

3. In remote-rpc-connector module, we can now pass configuration
   obtained from config subsystem to actor system so that its
   available to all actors. This obviates the need to manually pass
   the configuration via constructors or other *Constant or *Util
   classes. It brings all configuration items in the module at one
   place and makes it easier to move them to config subsystem, if
   its required.

4. In spirit of DRY, moves common code to clustering-commons

5. Minor code inspection fixes.

6. Makes mailbox-capacity configurable via config subsystem.

Patch 9:
Adds a new behaviour (MeteringBehavior.java).
AbstractUntypedActorWithMetering and Shard actor exhibit this
behavior current.
This patch also refactors unified configuration (config subsystem
+ typesafe config) pieces.

Note that in subsequent commits distributed-datastore will have
its configuration added to actor system configuration. Also few
more configuration items in remote-rpc-connector will go to
config subsystem. I wanted to limit the amount of changes going
into this already large commit.

Change-Id: I383ec813c16ed09ed0e68ee59179f454c0d174cf
Signed-off-by: Abhishek Kumar <abhishk2@cisco.com>
57 files changed:
opendaylight/md-sal/sal-clustering-commons/pom.xml
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractConfig.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java with 81% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActorWithMetering.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AkkaConfigurationReader.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/AkkaConfigurationReader.java with 87% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/CommonConfig.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/DefaultAkkaConfigurationReader.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/DefaultAkkaConfigurationReader.java with 93% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailbox.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteringBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/Monitor.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/Monitor.java with 90% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnifiedConfig.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/reporting/MetricsReporter.java [moved from opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/reporting/MetricsReporter.java with 90% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java [deleted file]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/CommonConfigTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailboxTest.java [moved from opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/common/actor/MeteredBoundedMailboxTest.java with 88% similarity]
opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf
opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
opendaylight/md-sal/sal-remoterpc-connector/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModule.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf
opendaylight/md-sal/sal-remoterpc-connector/src/main/yang/remote-rpc-connector.yang
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfigTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf

index a3619ec..d12f867 100644 (file)
       </dependency>
 
 
+      <dependency>
+          <groupId>com.typesafe.akka</groupId>
+          <artifactId>akka-osgi_${scala.version}</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>com.typesafe.akka</groupId>
+          <artifactId>akka-actor_${scala.version}</artifactId>
+      </dependency>
       <dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractConfig.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractConfig.java
new file mode 100644 (file)
index 0000000..3a66aa1
--- /dev/null
@@ -0,0 +1,47 @@
+package org.opendaylight.controller.cluster.common.actor;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class AbstractConfig implements UnifiedConfig {
+
+    private Config config;
+
+    public AbstractConfig(Config config){
+        this.config = config;
+    }
+
+    @Override
+    public Config get() {
+        return config;
+    }
+
+    public static abstract class Builder<T extends Builder>{
+        protected Map<String, Object> configHolder;
+        protected Config fallback;
+
+        private final String actorSystemName;
+
+        public Builder(String actorSystemName){
+            Preconditions.checkArgument(actorSystemName != null, "Actor system name must not be null");
+            this.actorSystemName = actorSystemName;
+            configHolder = new HashMap<>();
+        }
+
+        public T withConfigReader(AkkaConfigurationReader reader){
+            fallback = reader.read().getConfig(actorSystemName);
+            return (T)this;
+        }
+
+        protected Config merge(){
+            if (fallback == null)
+                fallback = ConfigFactory.load().getConfig(actorSystemName);
+
+            return ConfigFactory.parseMap(configHolder).withFallback(fallback);
+        }
+    }
+}
@@ -6,31 +6,32 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.cluster.datastore;
+package org.opendaylight.controller.cluster.common.actor;
 
 import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.datastore.messages.Monitor;
 
 public abstract class AbstractUntypedActor extends UntypedActor {
     protected final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
 
-
     public AbstractUntypedActor() {
         LOG.debug("Actor created {}", getSelf());
         getContext().
             system().
             actorSelection("user/termination-monitor").
             tell(new Monitor(getSelf()), getSelf());
+
     }
 
     @Override public void onReceive(Object message) throws Exception {
-        LOG.debug("Received message {}", message.getClass().getSimpleName());
+        final String messageType = message.getClass().getSimpleName();
+        LOG.debug("Received message {}", messageType);
+
         handleReceive(message);
-        LOG.debug("Done handling message {}",
-            message.getClass().getSimpleName());
+
+        LOG.debug("Done handling message {}", messageType);
     }
 
     protected abstract void handleReceive(Object message) throws Exception;
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActorWithMetering.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActorWithMetering.java
new file mode 100644 (file)
index 0000000..5497f93
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+/**
+ * Actor with its behaviour metered. Metering is enabled by configuration.
+ */
+public abstract class AbstractUntypedActorWithMetering extends AbstractUntypedActor {
+
+    public AbstractUntypedActorWithMetering() {
+        if (isMetricsCaptureEnabled())
+            getContext().become(new MeteringBehavior(this));
+    }
+
+    private boolean isMetricsCaptureEnabled(){
+        CommonConfig config = new CommonConfig(getContext().system().settings().config());
+        return config.isMetricCaptureEnabled();
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/CommonConfig.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/CommonConfig.java
new file mode 100644 (file)
index 0000000..0d139f9
--- /dev/null
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class CommonConfig extends AbstractConfig {
+
+    protected static final String TAG_ACTOR_SYSTEM_NAME = "actor-system-name";
+    protected static final String TAG_METRIC_CAPTURE_ENABLED = "metric-capture-enabled";
+    protected static final String TAG_MAILBOX_CAPACITY = "mailbox-capacity";
+    protected static final String TAG_MAILBOX = "bounded-mailbox";
+    protected static final String TAG_MAILBOX_PUSH_TIMEOUT = "mailbox-push-timeout-time";
+
+    //TODO: Ideally these defaults should go to reference.conf
+    // https://bugs.opendaylight.org/show_bug.cgi?id=1709
+    private static final int DEFAULT_MAILBOX_CAPACITY = 1000;
+    private static final int DEFAULT_MAILBOX_PUSH_TIMEOUT = 100;
+
+    //locally cached values
+    private FiniteDuration cachedMailBoxPushTimeout;
+    private Integer cachedMailBoxCapacity;
+    private Boolean cachedMetricCaptureEnableFlag;
+
+    public CommonConfig(Config config) {
+        super(config);
+    }
+
+    public String getActorSystemName() {
+        return get().getString(TAG_ACTOR_SYSTEM_NAME);
+    }
+
+    public boolean isMetricCaptureEnabled(){
+        if (cachedMetricCaptureEnableFlag != null){
+            return cachedMetricCaptureEnableFlag;
+        }
+
+        cachedMetricCaptureEnableFlag = get().hasPath(TAG_METRIC_CAPTURE_ENABLED)
+                ? get().getBoolean(TAG_METRIC_CAPTURE_ENABLED)
+                : false;
+
+        return cachedMetricCaptureEnableFlag;
+    }
+
+    public String getMailBoxName() {
+        return TAG_MAILBOX;
+    }
+
+    public Integer getMailBoxCapacity() {
+
+        if (cachedMailBoxCapacity != null) {
+            return cachedMailBoxCapacity;
+        }
+
+        final String PATH = new StringBuilder(TAG_MAILBOX).append(".").append(TAG_MAILBOX_CAPACITY).toString();
+        cachedMailBoxCapacity = get().hasPath(PATH)
+                ? get().getInt(PATH)
+                : DEFAULT_MAILBOX_CAPACITY;
+
+        return cachedMailBoxCapacity;
+    }
+
+    public FiniteDuration getMailBoxPushTimeout() {
+
+        if (cachedMailBoxPushTimeout != null) {
+            return cachedMailBoxPushTimeout;
+        }
+
+        final String PATH = new StringBuilder(TAG_MAILBOX).append(".").append(TAG_MAILBOX_PUSH_TIMEOUT).toString();
+
+        long timeout = get().hasPath(PATH)
+                ? get().getDuration(PATH, TimeUnit.NANOSECONDS)
+                : DEFAULT_MAILBOX_PUSH_TIMEOUT;
+
+        cachedMailBoxPushTimeout = new FiniteDuration(timeout, TimeUnit.NANOSECONDS);
+        return cachedMailBoxPushTimeout;
+    }
+
+    public static class Builder<T extends Builder> extends AbstractConfig.Builder<T>{
+
+        public Builder(String actorSystemName) {
+            super(actorSystemName);
+
+            //actor system config
+            configHolder.put(TAG_ACTOR_SYSTEM_NAME, actorSystemName);
+
+            //config for bounded mailbox
+            configHolder.put(TAG_MAILBOX, new HashMap<String, Object>());
+        }
+
+        public T metricCaptureEnabled(boolean enabled) {
+            configHolder.put(TAG_METRIC_CAPTURE_ENABLED, String.valueOf(enabled));
+            return (T)this;
+        }
+
+        public T mailboxCapacity(int capacity) {
+            Preconditions.checkArgument(capacity > 0, "mailbox capacity must be >0");
+
+            Map<String, Object> boundedMailbox = (Map) configHolder.get(TAG_MAILBOX);
+            boundedMailbox.put(TAG_MAILBOX_CAPACITY, capacity);
+            return (T)this;
+        }
+
+        public T mailboxPushTimeout(String timeout){
+            Duration pushTimeout = Duration.create(timeout);
+            Preconditions.checkArgument(pushTimeout.isFinite(), "invalid value for mailbox push timeout");
+
+            Map<String, Object> boundedMailbox = (Map) configHolder.get(TAG_MAILBOX);
+            boundedMailbox.put(TAG_MAILBOX_PUSH_TIMEOUT, timeout);
+            return (T)this;
+        }
+
+        public CommonConfig build() {
+            return new CommonConfig(merge());
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailbox.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailbox.java
new file mode 100644 (file)
index 0000000..458f379
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.BoundedDequeBasedMailbox;
+import akka.dispatch.MailboxType;
+import akka.dispatch.ProducesMessageQueue;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
+
+    private final Logger LOG = LoggerFactory.getLogger(MeteredBoundedMailbox.class);
+
+    private MeteredMessageQueue queue;
+    private Integer capacity;
+    private FiniteDuration pushTimeOut;
+    private MetricRegistry registry;
+
+    private final String QUEUE_SIZE = "q-size";
+
+    public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
+
+        CommonConfig commonConfig = new CommonConfig(settings.config());
+        this.capacity = commonConfig.getMailBoxCapacity();
+        this.pushTimeOut = commonConfig.getMailBoxPushTimeout();
+
+        MetricsReporter reporter = MetricsReporter.getInstance();
+        registry = reporter.getMetricsRegistry();
+    }
+
+
+    @Override
+    public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
+        this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
+        monitorQueueSize(owner, this.queue);
+        return this.queue;
+    }
+
+    private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
+        if (owner.isEmpty()) {
+            return; //there's no actor to monitor
+        }
+        String actorName = owner.get().path().toStringWithoutAddress();
+        String metricName = registry.name(actorName, QUEUE_SIZE);
+
+        if (registry.getMetrics().containsKey(metricName))
+            return; //already registered
+
+        Gauge queueSize = getQueueSizeGuage(monitoredQueue);
+        registerQueueSizeMetric(metricName, queueSize);
+    }
+
+
+    public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
+
+        public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
+            super(capacity, pushTimeOut);
+        }
+    }
+
+    private Gauge getQueueSizeGuage(final MeteredMessageQueue monitoredQueue ){
+        return new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return monitoredQueue.size();
+            }
+        };
+    }
+
+    private void registerQueueSizeMetric(String metricName, Gauge metric){
+        try {
+            registry.register(metricName,metric);
+        } catch (IllegalArgumentException e) {
+            LOG.warn("Unable to register queue size in metrics registry. Failed with exception {}. ", e);
+        }
+    }
+}
+
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteringBehavior.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteringBehavior.java
new file mode 100644 (file)
index 0000000..d67d413
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.actor.UntypedActor;
+import akka.japi.Procedure;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+
+/**
+ * Represents behaviour that can be exhibited by actors of type {@link akka.actor.UntypedActor}
+ * <p/>
+ * This behaviour meters actor's default behaviour. It captures 2 metrics:
+ * <ul>
+ *     <li>message processing rate of actor's receive block</li>
+ *     <li>message processing rate by message type</li>
+ * </ul>
+ *
+ * The information is reported to {@link org.opendaylight.controller.cluster.reporting.MetricsReporter}
+ */
+public class MeteringBehavior implements Procedure<Object> {
+
+    private final UntypedActor meteredActor;
+
+    private final MetricRegistry METRICREGISTRY = MetricsReporter.getInstance().getMetricsRegistry();
+    private final String MSG_PROCESSING_RATE = "msg-rate";
+
+    private String actorName;
+    private Timer msgProcessingTimer;
+
+    /**
+     *
+     * @param actor whose behaviour needs to be metered
+     */
+    public MeteringBehavior(UntypedActor actor){
+        Preconditions.checkArgument(actor != null, "actor must not be null");
+
+        this.meteredActor = actor;
+        actorName = meteredActor.getSelf().path().toStringWithoutAddress();
+        final String msgProcessingTime = MetricRegistry.name(actorName, MSG_PROCESSING_RATE);
+        msgProcessingTimer = METRICREGISTRY.timer(msgProcessingTime);
+    }
+
+    /**
+     * Uses 2 timers to measure message processing rate. One for overall message processing rate and
+     * another to measure rate by message type. The timers are re-used if they were previously created.
+     * <p/>
+     * {@link com.codahale.metrics.MetricRegistry} maintains a reservoir for different timers where
+     * collected timings are kept. It exposes various metrics for each timer based on collected
+     * data. Eg: count of messages, 99, 95, 50... percentiles, max, mean etc.
+     * <p/>
+     * These metrics are exposed as JMX bean.
+     *
+     * @see <a href="http://dropwizard.github.io/metrics/manual/core/#timers">
+     *     http://dropwizard.github.io/metrics/manual/core/#timers</a>
+     *
+     * @param message
+     * @throws Exception
+     */
+    @Override
+    public void apply(Object message) throws Exception {
+        final String messageType = message.getClass().getSimpleName();
+
+        final String msgProcessingTimeByMsgType =
+                MetricRegistry.name(actorName, MSG_PROCESSING_RATE, messageType);
+
+        final Timer msgProcessingTimerByMsgType = METRICREGISTRY.timer(msgProcessingTimeByMsgType);
+
+        //start timers
+        final Timer.Context context = msgProcessingTimer.time();
+        final Timer.Context contextByMsgType = msgProcessingTimerByMsgType.time();
+
+        meteredActor.onReceive(message);
+
+        //stop timers
+        contextByMsgType.stop();
+        context.stop();
+    }
+}
@@ -6,7 +6,7 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.remote.rpc.messages;
+package org.opendaylight.controller.cluster.common.actor;
 
 import akka.actor.ActorRef;
 
@@ -14,7 +14,6 @@ public class Monitor {
     private final ActorRef actorRef;
 
     public Monitor(ActorRef actorRef){
-
         this.actorRef = actorRef;
     }
 
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnifiedConfig.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnifiedConfig.java
new file mode 100644 (file)
index 0000000..62b6055
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import com.typesafe.config.Config;
+
+/**
+ * Represents a unified view of configuration.
+ * <p/>
+ * It merges configuration from:
+ * <ul>
+ *     <li>Config subsystem</li>
+ *     <li>Akka configuration files</li>
+ * </ul>
+ *
+ * Configurations defined in config subsystem takes precedence.
+ */
+public interface UnifiedConfig {
+
+    /**
+     * Returns an immutable instance of unified configuration
+     * @return
+     */
+    public Config get();
+}
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.common.reporting;
+package org.opendaylight.controller.cluster.reporting;
 
 import com.codahale.metrics.JmxReporter;
 import com.codahale.metrics.MetricRegistry;
@@ -21,7 +21,7 @@ import com.codahale.metrics.MetricRegistry;
 public class MetricsReporter implements AutoCloseable{
 
     private final MetricRegistry METRICS_REGISTRY = new MetricRegistry();
-    private final String DOMAIN = "org.opendaylight.controller";
+    private final String DOMAIN = "org.opendaylight.controller.actor.metric";
 
     public final JmxReporter jmxReporter = JmxReporter.forRegistry(METRICS_REGISTRY).inDomain(DOMAIN).build();
 
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java
deleted file mode 100644 (file)
index c6d3625..0000000
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.common.actor;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.BoundedDequeBasedMailbox;
-import akka.dispatch.MailboxType;
-import akka.dispatch.ProducesMessageQueue;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.MetricRegistry;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.opendaylight.controller.common.reporting.MetricsReporter;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.TimeUnit;
-
-public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
-
-    private MeteredMessageQueue queue;
-    private Integer capacity;
-    private FiniteDuration pushTimeOut;
-    private ActorPath actorPath;
-    private MetricsReporter reporter;
-
-    private final String QUEUE_SIZE = "queue-size";
-    private final String CAPACITY = "mailbox-capacity";
-    private final String TIMEOUT  = "mailbox-push-timeout-time";
-    private final Long DEFAULT_TIMEOUT = 10L;
-
-    public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
-        Preconditions.checkArgument( config.hasPath("mailbox-capacity"), "Missing configuration [mailbox-capacity]" );
-        this.capacity = config.getInt(CAPACITY);
-        Preconditions.checkArgument( this.capacity > 0, "mailbox-capacity must be > 0");
-
-        Long timeout = -1L;
-        if ( config.hasPath(TIMEOUT) ){
-            timeout = config.getDuration(TIMEOUT, TimeUnit.NANOSECONDS);
-        } else {
-            timeout = DEFAULT_TIMEOUT;
-        }
-        Preconditions.checkArgument( timeout > 0, "mailbox-push-timeout-time must be > 0");
-        this.pushTimeOut = new FiniteDuration(timeout, TimeUnit.NANOSECONDS);
-
-        reporter = MetricsReporter.getInstance();
-    }
-
-
-    @Override
-    public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
-        this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
-        monitorQueueSize(owner, this.queue);
-        return this.queue;
-    }
-
-    private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
-        if (owner.isEmpty()) {
-            return; //there's no actor to monitor
-        }
-        actorPath = owner.get().path();
-        String actorInstanceId = Integer.toString(owner.get().hashCode());
-
-        MetricRegistry registry = reporter.getMetricsRegistry();
-        String actorName = registry.name(actorPath.toString(), actorInstanceId, QUEUE_SIZE);
-
-        if (registry.getMetrics().containsKey(actorName))
-            return; //already registered
-
-        registry.register(actorName,
-                new Gauge<Integer>() {
-                    @Override
-                    public Integer getValue() {
-                        return monitoredQueue.size();
-                    }
-                });
-    }
-
-
-    public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
-
-        public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
-            super(capacity, pushTimeOut);
-        }
-    }
-
-}
-
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/CommonConfigTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/CommonConfigTest.java
new file mode 100644 (file)
index 0000000..cd77ab2
--- /dev/null
@@ -0,0 +1,43 @@
+package org.opendaylight.controller.cluster.common.actor;
+
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CommonConfigTest {
+
+    @Test
+    public void testCommonConfigDefaults(){
+        CommonConfig config = new CommonConfig.Builder<>("testsystem").build();
+
+        assertNotNull(config.getActorSystemName());
+        assertNotNull(config.getMailBoxCapacity());
+        assertNotNull(config.getMailBoxName());
+        assertNotNull(config.getMailBoxPushTimeout());
+        assertNotNull(config.isMetricCaptureEnabled());
+    }
+
+    @Test
+    public void testCommonConfigOverride(){
+
+        int expectedCapacity = 123;
+        String timeoutValue = "1000ms";
+        CommonConfig config = new CommonConfig.Builder<>("testsystem")
+                .mailboxCapacity(expectedCapacity)
+                .mailboxPushTimeout(timeoutValue)
+                .metricCaptureEnabled(true)
+                .build();
+
+        assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+
+        FiniteDuration expectedTimeout = FiniteDuration.create(1000, TimeUnit.MILLISECONDS);
+        assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+
+        assertTrue(config.isMetricCaptureEnabled());
+    }
+}
\ No newline at end of file
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.common.actor;
+package org.opendaylight.controller.cluster.common.actor;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -25,11 +25,13 @@ import java.util.concurrent.locks.ReentrantLock;
 public class MeteredBoundedMailboxTest {
 
     private static ActorSystem actorSystem;
+    private static CommonConfig config;
     private final ReentrantLock lock = new ReentrantLock();
 
     @Before
     public void setUp() throws Exception {
-        actorSystem = ActorSystem.create("testsystem");
+        config = new CommonConfig.Builder<>("testsystem").build();
+        actorSystem = ActorSystem.create("testsystem", config.get());
     }
 
     @After
@@ -39,15 +41,14 @@ public class MeteredBoundedMailboxTest {
     }
 
     @Test
-    public void test_WhenQueueIsFull_ShouldSendMsgToDeadLetter() throws InterruptedException {
+    public void shouldSendMsgToDeadLetterWhenQueueIsFull() throws InterruptedException {
         final JavaTestKit mockReceiver = new JavaTestKit(actorSystem);
         actorSystem.eventStream().subscribe(mockReceiver.getRef(), DeadLetter.class);
 
 
         final FiniteDuration TWENTY_SEC = new FiniteDuration(20, TimeUnit.SECONDS);
 
-        String boundedMailBox = actorSystem.name() + ".bounded-mailbox";
-        ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(boundedMailBox),
+        ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(config.getMailBoxName()),
                                                      "pingpongactor");
 
         actorSystem.mailboxes().settings();
index 0392dec..a8a6513 100644 (file)
@@ -1,7 +1,7 @@
 testsystem {
 
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 10
     mailbox-push-timeout-time = 100ms
   }
index 3481bae..43696c7 100644 (file)
@@ -1,8 +1,5 @@
-testsystem {
-
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }
-}
\ No newline at end of file
index 72da630..fbb666a 100644 (file)
@@ -56,6 +56,9 @@
                         <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
                         <name>dom-broker</name>
                     </dom-broker>
+                    <enable-metric-capture xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">true</enable-metric-capture>
+                    <actor-system-name xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">odl-cluster-rpc</actor-system-name>
+                    <bounded-mailbox-capacity xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">1000</bounded-mailbox-capacity>
                 </module>
 
             </modules>
index 5a2116b..f632b9c 100644 (file)
@@ -1,10 +1,13 @@
 
 odl-cluster-data {
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 100ms
-  }    
+  }
+
+  metric-capture-enabled = true
+
   akka {
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
@@ -44,10 +47,13 @@ odl-cluster-data {
 
 odl-cluster-rpc {
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 100ms
   }
+
+  metric-capture-enabled = true
+
   akka {
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
@@ -62,7 +68,7 @@ odl-cluster-rpc {
     }
 
     cluster {
-      seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@127.0.0.1:2551"]
+      seed-nodes = ["akka.tcp://odl-cluster-rpc@127.0.0.1:2551"]
 
       auto-down-unreachable-after = 10s
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java
deleted file mode 100644 (file)
index b326d61..0000000
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.osgi.BundleDelegatingClassLoader;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.osgi.framework.BundleContext;
-
-import java.io.File;
-
-public class ActorSystemFactory {
-
-    public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
-    public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
-    public static final String CONFIGURATION_NAME = "odl-cluster-data";
-
-    private static volatile ActorSystem actorSystem = null;
-
-    public static final ActorSystem getInstance(){
-        return actorSystem;
-    }
-
-    /**
-     * This method should be called only once during initialization
-     *
-     * @param bundleContext
-     */
-    public static final ActorSystem createInstance(final BundleContext bundleContext) {
-        if(actorSystem == null) {
-            // Create an OSGi bundle classloader for actor system
-            BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
-                Thread.currentThread().getContextClassLoader());
-            synchronized (ActorSystemFactory.class) {
-                // Double check
-
-                if (actorSystem == null) {
-                    ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
-                        ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader);
-                    system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
-                    actorSystem = system;
-                }
-            }
-        }
-
-        return actorSystem;
-    }
-
-
-    private static final Config readAkkaConfiguration(){
-        File defaultConfigFile = new File(AKKA_CONF_PATH);
-        Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
-        return ConfigFactory.parseFile(defaultConfigFile);
-    }
-}
index b3283a1..f1c0df4 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.Props;
 import akka.japi.Creator;
 
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
index 818f733..3426112 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.japi.Creator;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
index 8739ed1..a6f187d 100644 (file)
@@ -9,22 +9,60 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
-
+import akka.actor.Props;
+import akka.osgi.BundleDelegatingClassLoader;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.osgi.framework.BundleContext;
 
+import java.io.File;
+import java.util.concurrent.atomic.AtomicReference;
+
 public class DistributedDataStoreFactory {
+
+    public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
+    public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
+    public static final String CONFIGURATION_NAME = "odl-cluster-data";
+    private static AtomicReference<ActorSystem> actorSystem = new AtomicReference<>();
+
     public static DistributedDataStore createInstance(String name, SchemaService schemaService,
-            DatastoreContext datastoreContext, BundleContext bundleContext) {
+                                                      DatastoreContext datastoreContext, BundleContext bundleContext) {
 
-        ActorSystem actorSystem = ActorSystemFactory.createInstance(bundleContext);
+        ActorSystem actorSystem = getOrCreateInstance(bundleContext);
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore =
-            new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
-                    config, datastoreContext );
+                new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
+                        config, datastoreContext);
+
         ShardStrategyFactory.setConfiguration(config);
         schemaService.registerSchemaContextListener(dataStore);
         return dataStore;
     }
+
+    synchronized private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext) {
+
+        if (actorSystem.get() != null){
+            return actorSystem.get();
+        }
+        // Create an OSGi bundle classloader for actor system
+        BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
+                Thread.currentThread().getContextClassLoader());
+
+        ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
+                ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader);
+        system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+
+        actorSystem.set(system);
+        return system;
+    }
+
+
+    private static final Config readAkkaConfiguration() {
+        File defaultConfigFile = new File(AKKA_CONF_PATH);
+        Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
+        return ConfigFactory.parseFile(defaultConfigFile);
+    }
 }
index 7d57004..a1858f5 100644 (file)
@@ -26,6 +26,8 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
+import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
@@ -138,6 +140,9 @@ public class Shard extends RaftActor {
         shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
 
+        if (isMetricsCaptureEnabled()) {
+            getContext().become(new MeteringBehavior(this));
+        }
     }
 
     private static Map<String, String> mapPeerAddresses(
@@ -406,7 +411,12 @@ public class Shard extends RaftActor {
         ActorRef transactionChain = getContext().actorOf(
                 ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
         getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
-            getSelf());
+                getSelf());
+    }
+
+    private boolean isMetricsCaptureEnabled(){
+        CommonConfig config = new CommonConfig(getContext().system().settings().config());
+        return config.isMetricCaptureEnabled();
     }
 
     @Override protected void applyState(ActorRef clientActor, String identifier,
index 58cdefe..13ecaa5 100644 (file)
@@ -18,6 +18,8 @@ import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
 import akka.japi.Function;
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
@@ -48,7 +50,7 @@ import java.util.Map;
  * <li> Monitor the cluster members and store their addresses
  * <ul>
  */
-public class ShardManager extends AbstractUntypedActor {
+public class ShardManager extends AbstractUntypedActorWithMetering {
 
     // Stores a mapping between a member name and the address of the member
     // Member names look like "member-1", "member-2" etc and are as specified
index 65f865b..b810ed9 100644 (file)
@@ -16,6 +16,8 @@ import akka.japi.Creator;
 
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+
 
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
index 484bd54..8fe94cf 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
index 2dce6a1..e3ae5da 100644 (file)
@@ -18,6 +18,7 @@ import akka.japi.Creator;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
index c29f93b..6be6cda 100644 (file)
@@ -1,10 +1,13 @@
 
 odl-cluster-data {
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 100ms
   }
+
+  metric-capture-enabled = true
+
   akka {
     loggers = ["akka.event.slf4j.Slf4jLogger"]
     cluster {
index 82bc5e2..e19a767 100644 (file)
@@ -41,13 +41,13 @@ module distributed-datastore-provider {
             range "1..max";
         }
     }
-    
+
     typedef operation-timeout-type {
         type uint16 {
             range "5..max";
         }
     }
-    
+
     grouping data-store-properties {
         leaf max-shard-data-change-executor-queue-size {
             default 1000;
@@ -72,20 +72,32 @@ module distributed-datastore-provider {
             type non-zero-uint16-type;
             description "The maximum queue size for each shard's data store executor.";
          }
-            
+
          leaf shard-transaction-idle-timeout-in-minutes {
             default 10;
             type non-zero-uint16-type;
             description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
          }
-         
+
          leaf operation-timeout-in-seconds {
             default 5;
             type operation-timeout-type;
             description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
          }
+
+         leaf enable-metric-capture {
+            default false;
+            type boolean;
+            description "Enable or disable metric capture.";
+         }
+
+         leaf bounded-mailbox-capacity {
+             default 1000;
+             type non-zero-uint16-type;
+             description "Max queue size that an actor's mailbox can reach";
+         }
     }
-    
+
     // Augments the 'configuration' choice node under modules/module.
     augment "/config:modules/config:module/config:configuration" {
         case distributed-config-datastore-provider {
index 6851b1b..794b376 100644 (file)
@@ -15,7 +15,7 @@ akka {
     }
 }
 bounded-mailbox {
-  mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+  mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
   mailbox-capacity = 1000
   mailbox-push-timeout-time = 100ms
 }
index 8d454c4..0fb468b 100644 (file)
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-slf4j_${scala.version}</artifactId>
   </dependency>
+
+      <dependency>
+          <groupId>com.typesafe.akka</groupId>
+          <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
+      </dependency>
     <!-- SAL Dependencies -->
 
     <dependency>
index 2be8ba4..c2e8125 100644 (file)
@@ -1,5 +1,7 @@
 package org.opendaylight.controller.config.yang.config.remote_rpc_connector;
 
+import org.opendaylight.controller.cluster.common.actor.DefaultAkkaConfigurationReader;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderFactory;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.osgi.framework.BundleContext;
@@ -22,7 +24,14 @@ public class RemoteRPCBrokerModule extends org.opendaylight.controller.config.ya
   @Override
   public java.lang.AutoCloseable createInstance() {
     Broker broker = getDomBrokerDependency();
-    return RemoteRpcProviderFactory.createInstance(broker, bundleContext);
+
+    RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder(getActorSystemName())
+                              .metricCaptureEnabled(getEnableMetricCapture())
+                              .mailboxCapacity(getBoundedMailboxCapacity())
+                              .withConfigReader(new DefaultAkkaConfigurationReader())
+                              .build();
+
+    return RemoteRpcProviderFactory.createInstance(broker, bundleContext, config);
   }
 
   public void setBundleContext(BundleContext bundleContext) {
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java
deleted file mode 100644 (file)
index 66593ae..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import org.opendaylight.controller.remote.rpc.messages.Monitor;
-
-public abstract class AbstractUntypedActor extends UntypedActor {
-    protected final LoggingAdapter LOG =
-        Logging.getLogger(getContext().system(), this);
-
-
-    public AbstractUntypedActor(){
-        LOG.debug("Actor created {}", getSelf());
-        getContext().
-            system().
-            actorSelection("user/termination-monitor").
-            tell(new Monitor(getSelf()), getSelf());
-    }
-
-    @Override public void onReceive(Object message) throws Exception {
-        LOG.debug("Received message {}", message);
-        handleReceive(message);
-        LOG.debug("Done handling message {}", message);
-    }
-
-    protected abstract void handleReceive(Object message) throws Exception;
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java
deleted file mode 100644 (file)
index da0d628..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-
-public class ActorConstants {
-  public static final String RPC_BROKER = "rpc-broker";
-  public static final String RPC_REGISTRY = "rpc-registry";
-  public static final String RPC_MANAGER = "rpc";
-
-  public static final String RPC_BROKER_PATH= "/user/rpc/rpc-broker";
-  public static final String RPC_REGISTRY_PATH = "/user/rpc/rpc-registry";
-  public static final String RPC_MANAGER_PATH = "/user/rpc";
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java
deleted file mode 100644 (file)
index 6a442c5..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorSystem;
-import akka.osgi.BundleDelegatingClassLoader;
-import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader;
-import org.osgi.framework.BundleContext;
-
-
-public class ActorSystemFactory {
-
-    public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-rpc";
-    public static final String CONFIGURATION_NAME = "odl-cluster-rpc";
-
-    private static volatile ActorSystem actorSystem = null;
-
-  public static final ActorSystem getInstance(){
-     return actorSystem;
-  }
-
-  /**
-   * This method should be called only once during initialization
-   *
-   * @param bundleContext
-   */
-  public static final void createInstance(final BundleContext bundleContext, AkkaConfigurationReader akkaConfigurationReader) {
-    if(actorSystem == null) {
-      // Create an OSGi bundle classloader for actor system
-      BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
-          Thread.currentThread().getContextClassLoader());
-      synchronized (ActorSystemFactory.class) {
-        // Double check
-        if (actorSystem == null) {
-          ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
-              akkaConfigurationReader.read().getConfig(CONFIGURATION_NAME), classLoader);
-          actorSystem = system;
-        }
-      }
-    } else {
-      throw new IllegalStateException("Actor system should be created only once. Use getInstance method to access existing actor system");
-    }
-  }
-
-}
index 7d7dbf0..0f84abb 100644 (file)
@@ -1,42 +1,40 @@
 package org.opendaylight.controller.remote.rpc;
 
-import static akka.pattern.Patterns.ask;
 import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
-import akka.util.Timeout;
-
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
-import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.ExecutionContext;
 
 import java.util.Collections;
 import java.util.Set;
 
+import static akka.pattern.Patterns.ask;
+
 public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefaultImplementation {
     private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
     private final ActorRef rpcBroker;
     private final SchemaContext schemaContext;
+    private final RemoteRpcProviderConfig config;
 
-    public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) {
+    public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext, RemoteRpcProviderConfig config) {
         this.rpcBroker = rpcBroker;
         this.schemaContext = schemaContext;
+        this.config = config;
     }
 
     @Override
@@ -63,8 +61,7 @@ public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefa
 
         final SettableFuture<RpcResult<CompositeNode>> listenableFuture = SettableFuture.create();
 
-        scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg,
-                new Timeout(ActorUtil.ASK_DURATION));
+        scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg, config.getAskDuration());
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
index d088f22..8b4ce31 100644 (file)
@@ -31,21 +31,25 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext
 
   private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
 
-  private final ActorSystem actorSystem;
   private final RpcProvisionRegistry rpcProvisionRegistry;
+
+  private ActorSystem actorSystem;
   private Broker.ProviderSession brokerSession;
   private SchemaContext schemaContext;
   private ActorRef rpcManager;
+  private RemoteRpcProviderConfig config;
 
 
   public RemoteRpcProvider(ActorSystem actorSystem, RpcProvisionRegistry rpcProvisionRegistry) {
     this.actorSystem = actorSystem;
     this.rpcProvisionRegistry = rpcProvisionRegistry;
+    this.config = new RemoteRpcProviderConfig(actorSystem.settings().config());
   }
 
   @Override
   public void close() throws Exception {
-    this.actorSystem.shutdown();
+    if (this.actorSystem != null)
+      this.actorSystem.shutdown();
   }
 
   @Override
@@ -60,17 +64,17 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext
   }
 
   private void start() {
-    LOG.info("Starting all rpc listeners and actors.");
-    // Create actor to handle and sync routing table in cluster
+    LOG.info("Starting remote rpc service...");
+
     SchemaService schemaService = brokerSession.getService(SchemaService.class);
     schemaContext = schemaService.getGlobalContext();
 
-    rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry), ActorConstants.RPC_MANAGER);
+    rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry),
+                                     config.getRpcManagerName());
 
-    LOG.debug("Rpc actors are created.");
+    LOG.debug("rpc manager started");
   }
 
-
   @Override
   public void onGlobalContextUpdated(SchemaContext schemaContext) {
     this.schemaContext = schemaContext;
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java
new file mode 100644 (file)
index 0000000..3f6d42d
--- /dev/null
@@ -0,0 +1,113 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.util.Timeout;
+import com.typesafe.config.Config;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class RemoteRpcProviderConfig extends CommonConfig {
+
+    protected static final String TAG_RPC_BROKER_NAME = "rpc-broker-name";
+    protected static final String TAG_RPC_REGISTRY_NAME = "registry-name";
+    protected static final String TAG_RPC_MGR_NAME = "rpc-manager-name";
+    protected static final String TAG_RPC_BROKER_PATH = "rpc-broker-path";
+    protected static final String TAG_RPC_REGISTRY_PATH = "rpc-registry-path";
+    protected static final String TAG_RPC_MGR_PATH = "rpc-manager-path";
+    protected static final String TAG_ASK_DURATION = "ask-duration";
+    private static final String TAG_GOSSIP_TICK_INTERVAL = "gossip-tick-interval";
+
+    //locally cached values
+    private Timeout cachedAskDuration;
+    private FiniteDuration cachedGossipTickInterval;
+
+    public RemoteRpcProviderConfig(Config config){
+        super(config);
+    }
+
+    public String getRpcBrokerName(){
+        return get().getString(TAG_RPC_BROKER_NAME);
+    }
+
+    public String getRpcRegistryName(){
+        return get().getString(TAG_RPC_REGISTRY_NAME);
+    }
+
+    public String getRpcManagerName(){
+        return get().getString(TAG_RPC_MGR_NAME);
+    }
+
+    public String getRpcBrokerPath(){
+        return get().getString(TAG_RPC_BROKER_PATH);
+    }
+
+    public String getRpcRegistryPath(){
+        return get().getString(TAG_RPC_REGISTRY_PATH);
+
+    }
+
+    public String getRpcManagerPath(){
+        return get().getString(TAG_RPC_MGR_PATH);
+    }
+
+
+    public Timeout getAskDuration(){
+        if (cachedAskDuration != null){
+            return cachedAskDuration;
+        }
+
+        cachedAskDuration = new Timeout(new FiniteDuration(
+                get().getDuration(TAG_ASK_DURATION, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS));
+
+        return cachedAskDuration;
+    }
+
+    public FiniteDuration getGossipTickInterval(){
+        if (cachedGossipTickInterval != null) {
+            return cachedGossipTickInterval;
+        }
+
+        cachedGossipTickInterval = new FiniteDuration(
+                get().getDuration(TAG_GOSSIP_TICK_INTERVAL, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+
+        return cachedGossipTickInterval;
+    }
+
+    public static class Builder extends CommonConfig.Builder<Builder>{
+
+        public Builder(String actorSystemName){
+            super(actorSystemName);
+
+            //Actor names
+            configHolder.put(TAG_RPC_BROKER_NAME, "broker");
+            configHolder.put(TAG_RPC_REGISTRY_NAME, "registry");
+            configHolder.put(TAG_RPC_MGR_NAME, "rpc");
+
+            //Actor paths
+            configHolder.put(TAG_RPC_BROKER_PATH, "/user/rpc/broker");
+            configHolder.put(TAG_RPC_REGISTRY_PATH, "/user/rpc/registry");
+            configHolder.put(TAG_RPC_MGR_PATH, "/user/rpc");
+
+            //durations
+            configHolder.put(TAG_ASK_DURATION, "15s");
+            configHolder.put(TAG_GOSSIP_TICK_INTERVAL, "500ms");
+
+        }
+
+        public RemoteRpcProviderConfig build(){
+            return new RemoteRpcProviderConfig(merge());
+        }
+    }
+
+
+}
index 0e6b795..2e355d4 100644 (file)
@@ -8,19 +8,43 @@
 
 package org.opendaylight.controller.remote.rpc;
 
-
-import org.opendaylight.controller.remote.rpc.utils.DefaultAkkaConfigurationReader;
+import akka.actor.ActorSystem;
+import akka.osgi.BundleDelegatingClassLoader;
+import com.typesafe.config.Config;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RemoteRpcProviderFactory {
-    public static RemoteRpcProvider createInstance(final Broker broker, final BundleContext bundleContext){
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProviderFactory.class);
+
+    public static RemoteRpcProvider createInstance(
+            final Broker broker, final BundleContext bundleContext, final RemoteRpcProviderConfig config){
 
-      ActorSystemFactory.createInstance(bundleContext, new DefaultAkkaConfigurationReader());
       RemoteRpcProvider rpcProvider =
-          new RemoteRpcProvider(ActorSystemFactory.getInstance(), (RpcProvisionRegistry) broker);
+          new RemoteRpcProvider(createActorSystem(bundleContext, config), (RpcProvisionRegistry) broker);
+
       broker.registerProvider(rpcProvider);
       return rpcProvider;
     }
+
+    private static ActorSystem createActorSystem(BundleContext bundleContext, RemoteRpcProviderConfig config){
+
+        // Create an OSGi bundle classloader for actor system
+        BundleDelegatingClassLoader classLoader =
+                new BundleDelegatingClassLoader(bundleContext.getBundle(),
+                        Thread.currentThread().getContextClassLoader());
+
+        Config actorSystemConfig = config.get();
+        LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render());
+
+        if (config.isMetricCaptureEnabled()) {
+            LOG.info("Instrumentation is enabled in actor system {}. Metrics can be viewed in JMX console.",
+                    config.getActorSystemName());
+        }
+
+        return ActorSystem.create(config.getActorSystemName(), actorSystemConfig, classLoader);
+    }
 }
index 2aca655..6b02235 100644 (file)
@@ -8,25 +8,26 @@
 
 package org.opendaylight.controller.remote.rpc;
 
-import static akka.pattern.Patterns.ask;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
 import akka.japi.Pair;
-import akka.util.Timeout;
-
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
 import org.opendaylight.controller.remote.rpc.utils.RoutingLogic;
-import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -36,16 +37,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Future;
 
+import static akka.pattern.Patterns.ask;
+
 /**
  * Actor to initiate execution of remote RPC on other nodes of the cluster.
  */
@@ -56,12 +54,14 @@ public class RpcBroker extends AbstractUntypedActor {
     private final Broker.ProviderSession brokerSession;
     private final ActorRef rpcRegistry;
     private final SchemaContext schemaContext;
+    private final RemoteRpcProviderConfig config;
 
     private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
             SchemaContext schemaContext) {
         this.brokerSession = brokerSession;
         this.rpcRegistry = rpcRegistry;
         this.schemaContext = schemaContext;
+        config = new RemoteRpcProviderConfig(getContext().system().settings().config());
     }
 
     public static Props props(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
@@ -85,8 +85,7 @@ public class RpcBroker extends AbstractUntypedActor {
                 null, msg.getRpc(), msg.getIdentifier());
         RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
 
-        scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg,
-                new Timeout(ActorUtil.LOCAL_ASK_DURATION));
+        scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg, config.getAskDuration());
 
         final ActorRef sender = getSender();
         final ActorRef self = self();
@@ -129,8 +128,7 @@ public class RpcBroker extends AbstractUntypedActor {
         ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(),
                 schemaContext), msg.getRpc());
 
-        scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg,
-                new Timeout(ActorUtil.REMOTE_ASK_DURATION));
+        scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg, config.getAskDuration());
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
index d4da226..4ae9c2e 100644 (file)
@@ -15,11 +15,9 @@ import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.japi.Creator;
 import akka.japi.Function;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -27,6 +25,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
+
 import java.util.Set;
 
 /**
@@ -43,16 +42,19 @@ public class RpcManager extends AbstractUntypedActor {
   private ActorRef rpcBroker;
   private ActorRef rpcRegistry;
   private final Broker.ProviderSession brokerSession;
+  private final RemoteRpcProviderConfig config;
   private RpcListener rpcListener;
   private RoutedRpcListener routeChangeListener;
   private RemoteRpcImplementation rpcImplementation;
   private final RpcProvisionRegistry rpcProvisionRegistry;
 
   private RpcManager(SchemaContext schemaContext,
-                     Broker.ProviderSession brokerSession, RpcProvisionRegistry rpcProvisionRegistry) {
+                     Broker.ProviderSession brokerSession,
+                     RpcProvisionRegistry rpcProvisionRegistry) {
     this.schemaContext = schemaContext;
     this.brokerSession = brokerSession;
     this.rpcProvisionRegistry = rpcProvisionRegistry;
+    this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
 
     createRpcActors();
     startListeners();
@@ -60,7 +62,8 @@ public class RpcManager extends AbstractUntypedActor {
 
 
   public static Props props(final SchemaContext schemaContext,
-                            final Broker.ProviderSession brokerSession, final RpcProvisionRegistry rpcProvisionRegistry) {
+                            final Broker.ProviderSession brokerSession,
+                            final RpcProvisionRegistry rpcProvisionRegistry) {
     return Props.create(new Creator<RpcManager>() {
       @Override
       public RpcManager create() throws Exception {
@@ -72,15 +75,13 @@ public class RpcManager extends AbstractUntypedActor {
   private void createRpcActors() {
     LOG.debug("Create rpc registry and broker actors");
 
-      Config conf = ConfigFactory.load();
-
     rpcRegistry =
             getContext().actorOf(Props.create(RpcRegistry.class).
-                withMailbox(ActorUtil.MAILBOX), ActorConstants.RPC_REGISTRY);
+                withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
 
     rpcBroker =
             getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
-                withMailbox(ActorUtil.MAILBOX),ActorConstants.RPC_BROKER);
+                withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
 
     RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
     rpcRegistry.tell(localRouter, self());
@@ -91,7 +92,7 @@ public class RpcManager extends AbstractUntypedActor {
 
     rpcListener = new RpcListener(rpcRegistry);
     routeChangeListener = new RoutedRpcListener(rpcRegistry);
-    rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext);
+    rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config);
 
     brokerSession.addRpcRegistrationListener(rpcListener);
     rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
index a90f1e1..abe2008 100644 (file)
@@ -12,7 +12,7 @@ import akka.actor.Terminated;
 import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
-import org.opendaylight.controller.remote.rpc.messages.Monitor;
+import org.opendaylight.controller.cluster.common.actor.Monitor;
 
 public class TerminationMonitor extends UntypedActor{
     protected final LoggingAdapter LOG =
index 5109d31..095d709 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.remote.rpc.registry;
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.dispatch.Mapper;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
@@ -18,9 +17,10 @@ import akka.japi.Option;
 import akka.japi.Pair;
 import akka.pattern.Patterns;
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import scala.concurrent.Future;
 
@@ -30,9 +30,9 @@ import java.util.List;
 import java.util.Map;
 
 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
@@ -45,7 +45,7 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu
  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
  * cluster wide information.
  */
-public class RpcRegistry extends UntypedActor {
+public class RpcRegistry extends AbstractUntypedActorWithMetering {
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
@@ -59,9 +59,11 @@ public class RpcRegistry extends UntypedActor {
      */
     private ActorRef localRouter;
 
+    private RemoteRpcProviderConfig config;
+
     public RpcRegistry() {
         bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
-
+        this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
         log.info("Bucket store path = {}", bucketStore.path().toString());
     }
 
@@ -69,11 +71,9 @@ public class RpcRegistry extends UntypedActor {
         this.bucketStore = bucketStore;
     }
 
-    @Override
-    public void onReceive(Object message) throws Exception {
-
-        log.debug("Received message: message [{}]", message);
 
+    @Override
+    protected void handleReceive(Object message) throws Exception {
         //TODO: if sender is remote, reject message
 
         if (message instanceof SetLocalRouter)
@@ -108,7 +108,7 @@ public class RpcRegistry extends UntypedActor {
 
         Preconditions.checkState(localRouter != null, "Router must be set first");
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
         futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
     }
 
@@ -117,7 +117,7 @@ public class RpcRegistry extends UntypedActor {
      */
     private void receiveRemoveRoutes(RemoveRoutes msg) {
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
         futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
 
     }
@@ -130,7 +130,7 @@ public class RpcRegistry extends UntypedActor {
     private void receiveGetRouter(FindRouters msg) {
         final ActorRef sender = getSender();
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis());
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration());
         futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
     }
 
@@ -151,7 +151,8 @@ public class RpcRegistry extends UntypedActor {
      * @param routeId
      * @return
      */
-    private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+    private Messages.FindRoutersReply createReplyWithRouters(
+            Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
 
         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
         Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
@@ -184,7 +185,8 @@ public class RpcRegistry extends UntypedActor {
      * @param sender  client who asked to find the routers.
      * @return
      */
-    private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
+    private Mapper<Object, Void> getMapperToGetRouter(
+            final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
         return new Mapper<Object, Void>() {
             @Override
             public Void apply(Object replyMessage) {
index ff51f4f..3de3fc0 100644 (file)
@@ -12,11 +12,11 @@ import akka.actor.ActorRef;
 import akka.actor.ActorRefProvider;
 import akka.actor.Address;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.cluster.ClusterActorRefProvider;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.utils.ConditionalProbe;
 
 import java.util.HashMap;
@@ -45,14 +45,14 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
  *
  */
-public class BucketStore extends UntypedActor {
+public class BucketStore extends AbstractUntypedActorWithMetering {
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
     /**
      * Bucket owned by the node
      */
-    private BucketImpl localBucket = new BucketImpl();;
+    private BucketImpl localBucket = new BucketImpl();
 
     /**
      * Buckets ownded by other known nodes in the cluster
@@ -71,20 +71,24 @@ public class BucketStore extends UntypedActor {
 
     private ConditionalProbe probe;
 
+    private final RemoteRpcProviderConfig config;
+
+    public BucketStore(){
+        config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+    }
+
     @Override
     public void preStart(){
         ActorRefProvider provider = getContext().provider();
         selfAddress = provider.getDefaultAddress();
 
         if ( provider instanceof ClusterActorRefProvider)
-            getContext().actorOf(Props.create(Gossiper.class).withMailbox(ActorUtil.MAILBOX), "gossiper");
+            getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
     }
 
-    @Override
-    public void onReceive(Object message) throws Exception {
-
-        log.debug("Received message: node[{}], message[{}]", selfAddress, message);
 
+    @Override
+    protected void handleReceive(Object message) throws Exception {
         if (probe != null) {
             probe.tell(message, getSelf());
         }
@@ -100,17 +104,16 @@ public class BucketStore extends UntypedActor {
             receiveGetLocalBucket();
         } else if (message instanceof GetBucketsByMembers) {
             receiveGetBucketsByMembers(
-                ((GetBucketsByMembers) message).getMembers());
+                    ((GetBucketsByMembers) message).getMembers());
         } else if (message instanceof GetBucketVersions) {
             receiveGetBucketVersions();
         } else if (message instanceof UpdateRemoteBuckets) {
             receiveUpdateRemoteBuckets(
-                ((UpdateRemoteBuckets) message).getBuckets());
+                    ((UpdateRemoteBuckets) message).getBuckets());
         } else {
             log.debug("Unhandled message [{}]", message);
             unhandled(message);
         }
-
     }
 
     /**
index f6ce5e5..85c6ebe 100644 (file)
@@ -12,7 +12,6 @@ import akka.actor.ActorRefProvider;
 import akka.actor.ActorSelection;
 import akka.actor.Address;
 import akka.actor.Cancellable;
-import akka.actor.UntypedActor;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterActorRefProvider;
 import akka.cluster.ClusterEvent;
@@ -21,7 +20,8 @@ import akka.dispatch.Mapper;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.pattern.Patterns;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -59,7 +59,7 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Go
  *
  */
 
-public class Gossiper extends UntypedActor {
+public class Gossiper extends AbstractUntypedActorWithMetering {
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
@@ -79,7 +79,11 @@ public class Gossiper extends UntypedActor {
 
     private Boolean autoStartGossipTicks = true;
 
-    public Gossiper(){}
+    private RemoteRpcProviderConfig config;
+
+    public Gossiper(){
+        config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+    }
 
     /**
      * Helpful for testing
@@ -106,7 +110,7 @@ public class Gossiper extends UntypedActor {
         if (autoStartGossipTicks) {
             gossipTask = getContext().system().scheduler().schedule(
                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
-                    ActorUtil.GOSSIP_TICK_INTERVAL,                 //interval
+                    config.getGossipTickInterval(),                 //interval
                     getSelf(),                                       //target
                     new Messages.GossiperMessages.GossipTick(),      //message
                     getContext().dispatcher(),                       //execution context
@@ -124,22 +128,19 @@ public class Gossiper extends UntypedActor {
     }
 
     @Override
-    public void onReceive(Object message) throws Exception {
-
-        log.debug("Received message: node[{}], message[{}]", selfAddress, message);
-
+    protected void handleReceive(Object message) throws Exception {
         //Usually sent by self via gossip task defined above. But its not enforced.
         //These ticks can be sent by another actor as well which is esp. useful while testing
         if (message instanceof GossipTick)
             receiveGossipTick();
 
-        //Message from remote gossiper with its bucket versions
+            //Message from remote gossiper with its bucket versions
         else if (message instanceof GossipStatus)
             receiveGossipStatus((GossipStatus) message);
 
-        //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
-        //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
-        //message with its local versions
+            //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
+            //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
+            //message with its local versions
         else if (message instanceof GossipEnvelope)
             receiveGossip((GossipEnvelope) message);
 
@@ -196,7 +197,7 @@ public class Gossiper extends UntypedActor {
     void receiveGossipTick(){
         if (clusterMembers.size() == 0) return; //no members to send gossip status to
 
-        Address remoteMemberToGossipTo = null;
+        Address remoteMemberToGossipTo;
 
         if (clusterMembers.size() == 1)
             remoteMemberToGossipTo = clusterMembers.get(0);
@@ -229,7 +230,7 @@ public class Gossiper extends UntypedActor {
 
         final ActorRef sender = getSender();
         Future<Object> futureReply =
-                Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+                Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
 
         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
 
@@ -271,7 +272,7 @@ public class Gossiper extends UntypedActor {
     void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
 
         Future<Object> futureReply =
-                Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), ActorUtil.ASK_DURATION.toMillis());
+                Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
     }
 
@@ -284,7 +285,7 @@ public class Gossiper extends UntypedActor {
 
         //Get local status from bucket store and send to remote
         Future<Object> futureReply =
-                Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+                Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
 
         //Find gossiper on remote system
         ActorSelection remoteRef = getContext().system().actorSelection(
@@ -382,8 +383,6 @@ public class Gossiper extends UntypedActor {
                             localIsOlder.add(address);
                         else if (localVersions.get(address) > remoteVersions.get(address))
                             localIsNewer.add(address);
-                        else
-                            continue;
                     }
 
                     if (!localIsOlder.isEmpty())
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java
deleted file mode 100644 (file)
index e2baffa..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- */
-package org.opendaylight.controller.remote.rpc.utils;
-
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.TimeUnit;
-
-public class ActorUtil {
-    public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS);
-    public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS);
-    public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS);
-    public static final FiniteDuration GOSSIP_TICK_INTERVAL = Duration.create(500, TimeUnit.MILLISECONDS);
-    public static final String MAILBOX = "bounded-mailbox";
-}
index 266832a..39ac991 100644 (file)
@@ -39,7 +39,7 @@ odl-cluster-data {
 
 odl-cluster-rpc {
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 100ms
   }
index 08db5c0..334d872 100644 (file)
@@ -5,11 +5,11 @@ module remote-rpc-connector {
 
     import config { prefix config; revision-date 2013-04-05; }
     import opendaylight-md-sal-dom {prefix dom;}
-    
+
     description
         "This module contains the base YANG definitions for
                  the remote routed rpc";
+
     revision "2014-07-07" {
         description
             "Initial revision";
@@ -25,7 +25,7 @@ module remote-rpc-connector {
     augment "/config:modules/config:module/config:configuration" {
         case remote-rpc-connector {
             when "/config:modules/config:module/config:type = 'remote-rpc-connector'";
-            
+
             container dom-broker {
                 uses config:service-ref {
                     refine type {
@@ -34,6 +34,24 @@ module remote-rpc-connector {
                     }
                 }
             }
+
+            leaf enable-metric-capture {
+                default false;
+                type boolean;
+                description "Enable or disable metric capture.";
+            }
+
+            leaf actor-system-name {
+                default odl-cluster-rpc;
+                type string;
+                description "Name by which actor system is identified. Its also used to find relevant configuration";
+            }
+
+            leaf bounded-mailbox-capacity {
+                default 1000;
+                type uint16;
+                description "Max queue size that an actor's mailbox can reach";
+            }
         }
     }
 
index 8d88682..46406fd 100644 (file)
@@ -8,17 +8,10 @@
 
 package org.opendaylight.controller.remote.rpc;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.ImmutableList;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -26,9 +19,9 @@ import org.mockito.Mockito;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
@@ -36,12 +29,16 @@ import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 
-import com.google.common.collect.ImmutableList;
-import com.typesafe.config.ConfigFactory;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Base class for RPC tests.
@@ -70,8 +67,10 @@ public class AbstractRpcTest {
 
     @BeforeClass
     public static void setup() throws InterruptedException {
-        node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
-        node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
+        RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+        RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+        node1 = ActorSystem.create("opendaylight-rpc", config1.get());
+        node2 = ActorSystem.create("opendaylight-rpc", config2.get());
     }
 
     @AfterClass
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java
deleted file mode 100644 (file)
index cd1cd91..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-
-import akka.actor.ActorSystem;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-import org.junit.After;
-import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class ActorSystemFactoryTest {
-  ActorSystem system = null;
-
-  @Test
-  public void testActorSystemCreation(){
-    BundleContext context = mock(BundleContext.class);
-    when(context.getBundle()).thenReturn(mock(Bundle.class));
-
-    AkkaConfigurationReader reader = mock(AkkaConfigurationReader.class);
-    when(reader.read()).thenReturn(ConfigFactory.load());
-
-    ActorSystemFactory.createInstance(context, reader);
-    system = ActorSystemFactory.getInstance();
-    Assert.assertNotNull(system);
-    // Check illegal state exception
-
-    try {
-      ActorSystemFactory.createInstance(context, reader);
-      fail("Illegal State exception should be thrown, while creating actor system second time");
-    } catch (IllegalStateException e) {
-    }
-  }
-
-  @After
-  public void cleanup() throws InterruptedException {
-    if(system != null) {
-      system.shutdown();
-    }
-  }
-}
index 6c3a57b..49451dd 100644 (file)
@@ -8,27 +8,26 @@
 
 package org.opendaylight.controller.remote.rpc;
 
-import static org.junit.Assert.assertEquals;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.Test;
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
 import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-import akka.testkit.JavaTestKit;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import static org.junit.Assert.assertEquals;
 
 /***
  * Unit tests for RemoteRpcImplementation.
@@ -42,7 +41,7 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         final AtomicReference<AssertionError> assertError = new AtomicReference<>();
         try {
             RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
-                    probeReg1.getRef(), schemaContext);
+                    probeReg1.getRef(), schemaContext, getConfig());
 
             final CompositeNode input = makeRPCInput("foo");
             final CompositeNode output = makeRPCOutput("bar");
@@ -68,7 +67,7 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         final AtomicReference<AssertionError> assertError = new AtomicReference<>();
         try {
             RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
-                    probeReg1.getRef(), schemaContext);
+                    probeReg1.getRef(), schemaContext, getConfig());
 
             QName instanceQName = new QName(new URI("ns"), "instance");
             YangInstanceIdentifier identifier = YangInstanceIdentifier.of(instanceQName);
@@ -99,7 +98,7 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         final AtomicReference<AssertionError> assertError = new AtomicReference<>();
         try {
             RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
-                    probeReg1.getRef(), schemaContext);
+                    probeReg1.getRef(), schemaContext, getConfig());
 
             final CompositeNode input = makeRPCInput("foo");
 
@@ -125,7 +124,7 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         final AtomicReference<AssertionError> assertError = new AtomicReference<>();
         try {
             RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
-                    probeReg1.getRef(), schemaContext);
+                    probeReg1.getRef(), schemaContext, getConfig());
 
             final CompositeNode input = makeRPCInput("foo");
 
@@ -182,4 +181,8 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
 
         return invokeRpcMsg;
     }
+
+    private RemoteRpcProviderConfig getConfig(){
+        return new RemoteRpcProviderConfig.Builder("unit-test").build();
+    }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfigTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfigTest.java
new file mode 100644 (file)
index 0000000..ae75252
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.testkit.TestActorRef;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+public class RemoteRpcProviderConfigTest {
+
+    @Test
+    public void testConfigDefaults() {
+
+        Config c = ConfigFactory.parseFile(new File("application.conf"));
+        RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test").build();
+
+        //Assert on configurations from common config
+        Assert.assertFalse(config.isMetricCaptureEnabled()); //should be disabled by default
+        Assert.assertNotNull(config.getMailBoxCapacity());
+        Assert.assertNotNull(config.getMailBoxName());
+        Assert.assertNotNull(config.getMailBoxPushTimeout());
+
+        //rest of the configurations should be set
+        Assert.assertNotNull(config.getActorSystemName());
+        Assert.assertNotNull(config.getRpcBrokerName());
+        Assert.assertNotNull(config.getRpcBrokerPath());
+        Assert.assertNotNull(config.getRpcManagerName());
+        Assert.assertNotNull(config.getRpcManagerPath());
+        Assert.assertNotNull(config.getRpcRegistryName());
+        Assert.assertNotNull(config.getRpcRegistryPath());
+        Assert.assertNotNull(config.getAskDuration());
+        Assert.assertNotNull(config.getGossipTickInterval());
+
+
+
+    }
+
+    @Test
+    public void testConfigCustomizations() {
+
+        AkkaConfigurationReader reader = new TestConfigReader();
+
+        final int expectedCapacity = 100;
+        String timeOutVal = "10ms";
+        FiniteDuration expectedTimeout = FiniteDuration.create(10, TimeUnit.MILLISECONDS);
+
+        RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test")
+                .metricCaptureEnabled(true)//enable metric capture
+                .mailboxCapacity(expectedCapacity)
+                .mailboxPushTimeout(timeOutVal)
+                .withConfigReader(reader)
+                .build();
+
+        Assert.assertTrue(config.isMetricCaptureEnabled());
+        Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+        Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+
+        //Now check this config inside an actor
+        ActorSystem system = ActorSystem.create("unit-test", config.get());
+        TestActorRef<ConfigTestActor> configTestActorTestActorRef =
+                TestActorRef.create(system, Props.create(ConfigTestActor.class));
+
+        ConfigTestActor actor = configTestActorTestActorRef.underlyingActor();
+        Config actorConfig = actor.getConfig();
+
+        config = new RemoteRpcProviderConfig(actorConfig);
+
+        Assert.assertTrue(config.isMetricCaptureEnabled());
+        Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+        Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+    }
+
+    public static class ConfigTestActor extends UntypedActor {
+
+        private Config actorSystemConfig;
+
+        public ConfigTestActor() {
+            this.actorSystemConfig = getContext().system().settings().config();
+        }
+
+        @Override
+        public void onReceive(Object message) throws Exception {
+        }
+
+        /**
+         * Only for testing. NEVER expose actor's internal state like this.
+         *
+         * @return
+         */
+        public Config getConfig() {
+            return actorSystemConfig;
+        }
+    }
+
+    public static class TestConfigReader implements AkkaConfigurationReader {
+
+        @Override
+        public Config read() {
+            return ConfigFactory.parseResources("application.conf");
+
+        }
+    }
+}
\ No newline at end of file
index 8a7e4a0..8b4599c 100644 (file)
@@ -13,7 +13,7 @@ package org.opendaylight.controller.remote.rpc;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.Config;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -33,11 +33,14 @@ import static org.mockito.Mockito.when;
 public class RemoteRpcProviderTest {
 
   static ActorSystem system;
-
+  static RemoteRpcProviderConfig moduleConfig;
 
   @BeforeClass
   public static void setup() throws InterruptedException {
-    system = ActorSystem.create("odl-cluster-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
+    moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc").build();
+    Config config = moduleConfig.get();
+    system = ActorSystem.create("odl-cluster-rpc", config);
+
   }
 
   @AfterClass
@@ -53,9 +56,14 @@ public class RemoteRpcProviderTest {
     SchemaService schemaService = mock(SchemaService.class);
     when(schemaService.getGlobalContext()). thenReturn(mock(SchemaContext.class));
     when(session.getService(SchemaService.class)).thenReturn(schemaService);
+
     rpcProvider.onSessionInitiated(session);
-    ActorRef actorRef = Await.result(system.actorSelection(ActorConstants.RPC_MANAGER_PATH).resolveOne(Duration.create(1, TimeUnit.SECONDS)),
-        Duration.create(2, TimeUnit.SECONDS));
-    Assert.assertTrue(actorRef.path().toString().contains(ActorConstants.RPC_MANAGER_PATH));
+
+    ActorRef actorRef = Await.result(
+            system.actorSelection(
+                    moduleConfig.getRpcManagerPath()).resolveOne(Duration.create(1, TimeUnit.SECONDS)),
+                                                                 Duration.create(2, TimeUnit.SECONDS));
+
+    Assert.assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath()));
   }
 }
index 83f5293..f6f720e 100644 (file)
@@ -9,12 +9,12 @@ import akka.actor.ChildActorPath;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Predicate;
-import com.typesafe.config.ConfigFactory;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
@@ -45,9 +45,12 @@ public class RpcRegistryTest {
 
   @BeforeClass
   public static void setup() throws InterruptedException {
-    node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
-    node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
-    node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC"));
+    RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+    RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+    RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build();
+    node1 = ActorSystem.create("opendaylight-rpc", config1.get());
+    node2 = ActorSystem.create("opendaylight-rpc", config2.get());
+    node3 = ActorSystem.create("opendaylight-rpc", config3.get());
   }
 
   @AfterClass
@@ -204,7 +207,10 @@ public class RpcRegistryTest {
         new ConditionalProbe(probe.getRef(), new Predicate() {
           @Override
           public boolean apply(@Nullable Object input) {
-            return clazz.equals(input.getClass());
+              if (input != null)
+                return clazz.equals(input.getClass());
+              else
+                  return false;
           }
         });
 
index 5c4af8d..8e31081 100644 (file)
@@ -1,6 +1,6 @@
 odl-cluster-rpc{
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }
@@ -37,11 +37,12 @@ odl-cluster-rpc{
 }
 unit-test{
   akka {
-    loglevel = "INFO"
+    loglevel = "DEBUG"
     #loggers = ["akka.event.slf4j.Slf4jLogger"]
   }
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+    #mailbox-capacity is specified in config subsystem
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }
@@ -49,7 +50,7 @@ unit-test{
 
 memberA{
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }
@@ -82,7 +83,7 @@ memberA{
 }
 memberB{
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }
@@ -116,7 +117,7 @@ memberB{
 }
 memberC{
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.