Metrics and Configuration 65/10665/15
authorAbhishek Kumar <abhishk2@cisco.com>
Wed, 27 Aug 2014 20:07:30 +0000 (13:07 -0700)
committerAbhishek Kumar <abhishk2@cisco.com>
Tue, 9 Sep 2014 20:18:11 +0000 (20:18 +0000)
1. Adds a new abstract class AbstractMeteredUntypedActored that
   extends AbstractUntypedActor. This adds metrics capture
   capability which can be turned on using Config Subsystem.
   By default its turned off.

2. Updates Shard actor; adds metrics capture capability which can be
   turned on via Config Subsystem.

3. In remote-rpc-connector module, we can now pass configuration
   obtained from config subsystem to actor system so that its
   available to all actors. This obviates the need to manually pass
   the configuration via constructors or other *Constant or *Util
   classes. It brings all configuration items in the module at one
   place and makes it easier to move them to config subsystem, if
   its required.

4. In spirit of DRY, moves common code to clustering-commons

5. Minor code inspection fixes.

6. Makes mailbox-capacity configurable via config subsystem.

Patch 9:
Adds a new behaviour (MeteringBehavior.java).
AbstractUntypedActorWithMetering and Shard actor exhibit this
behavior current.
This patch also refactors unified configuration (config subsystem
+ typesafe config) pieces.

Note that in subsequent commits distributed-datastore will have
its configuration added to actor system configuration. Also few
more configuration items in remote-rpc-connector will go to
config subsystem. I wanted to limit the amount of changes going
into this already large commit.

Change-Id: I383ec813c16ed09ed0e68ee59179f454c0d174cf
Signed-off-by: Abhishek Kumar <abhishk2@cisco.com>
57 files changed:
opendaylight/md-sal/sal-clustering-commons/pom.xml
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractConfig.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java with 81% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActorWithMetering.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AkkaConfigurationReader.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/AkkaConfigurationReader.java with 87% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/CommonConfig.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/DefaultAkkaConfigurationReader.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/DefaultAkkaConfigurationReader.java with 93% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailbox.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteringBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/Monitor.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/Monitor.java with 90% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnifiedConfig.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/reporting/MetricsReporter.java [moved from opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/reporting/MetricsReporter.java with 90% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java [deleted file]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/CommonConfigTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailboxTest.java [moved from opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/common/actor/MeteredBoundedMailboxTest.java with 88% similarity]
opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf
opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
opendaylight/md-sal/sal-remoterpc-connector/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModule.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf
opendaylight/md-sal/sal-remoterpc-connector/src/main/yang/remote-rpc-connector.yang
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfigTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf

index a3619ec4d230463edcbf7e15957b230db9d0cb09..d12f867ac5663b7637ecff59110dfbb0303798ec 100644 (file)
       </dependency>
 
 
+      <dependency>
+          <groupId>com.typesafe.akka</groupId>
+          <artifactId>akka-osgi_${scala.version}</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>com.typesafe.akka</groupId>
+          <artifactId>akka-actor_${scala.version}</artifactId>
+      </dependency>
       <dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractConfig.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractConfig.java
new file mode 100644 (file)
index 0000000..3a66aa1
--- /dev/null
@@ -0,0 +1,47 @@
+package org.opendaylight.controller.cluster.common.actor;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class AbstractConfig implements UnifiedConfig {
+
+    private Config config;
+
+    public AbstractConfig(Config config){
+        this.config = config;
+    }
+
+    @Override
+    public Config get() {
+        return config;
+    }
+
+    public static abstract class Builder<T extends Builder>{
+        protected Map<String, Object> configHolder;
+        protected Config fallback;
+
+        private final String actorSystemName;
+
+        public Builder(String actorSystemName){
+            Preconditions.checkArgument(actorSystemName != null, "Actor system name must not be null");
+            this.actorSystemName = actorSystemName;
+            configHolder = new HashMap<>();
+        }
+
+        public T withConfigReader(AkkaConfigurationReader reader){
+            fallback = reader.read().getConfig(actorSystemName);
+            return (T)this;
+        }
+
+        protected Config merge(){
+            if (fallback == null)
+                fallback = ConfigFactory.load().getConfig(actorSystemName);
+
+            return ConfigFactory.parseMap(configHolder).withFallback(fallback);
+        }
+    }
+}
@@ -6,31 +6,32 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.cluster.datastore;
+package org.opendaylight.controller.cluster.common.actor;
 
 import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.datastore.messages.Monitor;
 
 public abstract class AbstractUntypedActor extends UntypedActor {
     protected final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
 
-
     public AbstractUntypedActor() {
         LOG.debug("Actor created {}", getSelf());
         getContext().
             system().
             actorSelection("user/termination-monitor").
             tell(new Monitor(getSelf()), getSelf());
+
     }
 
     @Override public void onReceive(Object message) throws Exception {
-        LOG.debug("Received message {}", message.getClass().getSimpleName());
+        final String messageType = message.getClass().getSimpleName();
+        LOG.debug("Received message {}", messageType);
+
         handleReceive(message);
-        LOG.debug("Done handling message {}",
-            message.getClass().getSimpleName());
+
+        LOG.debug("Done handling message {}", messageType);
     }
 
     protected abstract void handleReceive(Object message) throws Exception;
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActorWithMetering.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActorWithMetering.java
new file mode 100644 (file)
index 0000000..5497f93
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+/**
+ * Actor with its behaviour metered. Metering is enabled by configuration.
+ */
+public abstract class AbstractUntypedActorWithMetering extends AbstractUntypedActor {
+
+    public AbstractUntypedActorWithMetering() {
+        if (isMetricsCaptureEnabled())
+            getContext().become(new MeteringBehavior(this));
+    }
+
+    private boolean isMetricsCaptureEnabled(){
+        CommonConfig config = new CommonConfig(getContext().system().settings().config());
+        return config.isMetricCaptureEnabled();
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/CommonConfig.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/CommonConfig.java
new file mode 100644 (file)
index 0000000..0d139f9
--- /dev/null
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class CommonConfig extends AbstractConfig {
+
+    protected static final String TAG_ACTOR_SYSTEM_NAME = "actor-system-name";
+    protected static final String TAG_METRIC_CAPTURE_ENABLED = "metric-capture-enabled";
+    protected static final String TAG_MAILBOX_CAPACITY = "mailbox-capacity";
+    protected static final String TAG_MAILBOX = "bounded-mailbox";
+    protected static final String TAG_MAILBOX_PUSH_TIMEOUT = "mailbox-push-timeout-time";
+
+    //TODO: Ideally these defaults should go to reference.conf
+    // https://bugs.opendaylight.org/show_bug.cgi?id=1709
+    private static final int DEFAULT_MAILBOX_CAPACITY = 1000;
+    private static final int DEFAULT_MAILBOX_PUSH_TIMEOUT = 100;
+
+    //locally cached values
+    private FiniteDuration cachedMailBoxPushTimeout;
+    private Integer cachedMailBoxCapacity;
+    private Boolean cachedMetricCaptureEnableFlag;
+
+    public CommonConfig(Config config) {
+        super(config);
+    }
+
+    public String getActorSystemName() {
+        return get().getString(TAG_ACTOR_SYSTEM_NAME);
+    }
+
+    public boolean isMetricCaptureEnabled(){
+        if (cachedMetricCaptureEnableFlag != null){
+            return cachedMetricCaptureEnableFlag;
+        }
+
+        cachedMetricCaptureEnableFlag = get().hasPath(TAG_METRIC_CAPTURE_ENABLED)
+                ? get().getBoolean(TAG_METRIC_CAPTURE_ENABLED)
+                : false;
+
+        return cachedMetricCaptureEnableFlag;
+    }
+
+    public String getMailBoxName() {
+        return TAG_MAILBOX;
+    }
+
+    public Integer getMailBoxCapacity() {
+
+        if (cachedMailBoxCapacity != null) {
+            return cachedMailBoxCapacity;
+        }
+
+        final String PATH = new StringBuilder(TAG_MAILBOX).append(".").append(TAG_MAILBOX_CAPACITY).toString();
+        cachedMailBoxCapacity = get().hasPath(PATH)
+                ? get().getInt(PATH)
+                : DEFAULT_MAILBOX_CAPACITY;
+
+        return cachedMailBoxCapacity;
+    }
+
+    public FiniteDuration getMailBoxPushTimeout() {
+
+        if (cachedMailBoxPushTimeout != null) {
+            return cachedMailBoxPushTimeout;
+        }
+
+        final String PATH = new StringBuilder(TAG_MAILBOX).append(".").append(TAG_MAILBOX_PUSH_TIMEOUT).toString();
+
+        long timeout = get().hasPath(PATH)
+                ? get().getDuration(PATH, TimeUnit.NANOSECONDS)
+                : DEFAULT_MAILBOX_PUSH_TIMEOUT;
+
+        cachedMailBoxPushTimeout = new FiniteDuration(timeout, TimeUnit.NANOSECONDS);
+        return cachedMailBoxPushTimeout;
+    }
+
+    public static class Builder<T extends Builder> extends AbstractConfig.Builder<T>{
+
+        public Builder(String actorSystemName) {
+            super(actorSystemName);
+
+            //actor system config
+            configHolder.put(TAG_ACTOR_SYSTEM_NAME, actorSystemName);
+
+            //config for bounded mailbox
+            configHolder.put(TAG_MAILBOX, new HashMap<String, Object>());
+        }
+
+        public T metricCaptureEnabled(boolean enabled) {
+            configHolder.put(TAG_METRIC_CAPTURE_ENABLED, String.valueOf(enabled));
+            return (T)this;
+        }
+
+        public T mailboxCapacity(int capacity) {
+            Preconditions.checkArgument(capacity > 0, "mailbox capacity must be >0");
+
+            Map<String, Object> boundedMailbox = (Map) configHolder.get(TAG_MAILBOX);
+            boundedMailbox.put(TAG_MAILBOX_CAPACITY, capacity);
+            return (T)this;
+        }
+
+        public T mailboxPushTimeout(String timeout){
+            Duration pushTimeout = Duration.create(timeout);
+            Preconditions.checkArgument(pushTimeout.isFinite(), "invalid value for mailbox push timeout");
+
+            Map<String, Object> boundedMailbox = (Map) configHolder.get(TAG_MAILBOX);
+            boundedMailbox.put(TAG_MAILBOX_PUSH_TIMEOUT, timeout);
+            return (T)this;
+        }
+
+        public CommonConfig build() {
+            return new CommonConfig(merge());
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailbox.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailbox.java
new file mode 100644 (file)
index 0000000..458f379
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.BoundedDequeBasedMailbox;
+import akka.dispatch.MailboxType;
+import akka.dispatch.ProducesMessageQueue;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
+
+    private final Logger LOG = LoggerFactory.getLogger(MeteredBoundedMailbox.class);
+
+    private MeteredMessageQueue queue;
+    private Integer capacity;
+    private FiniteDuration pushTimeOut;
+    private MetricRegistry registry;
+
+    private final String QUEUE_SIZE = "q-size";
+
+    public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
+
+        CommonConfig commonConfig = new CommonConfig(settings.config());
+        this.capacity = commonConfig.getMailBoxCapacity();
+        this.pushTimeOut = commonConfig.getMailBoxPushTimeout();
+
+        MetricsReporter reporter = MetricsReporter.getInstance();
+        registry = reporter.getMetricsRegistry();
+    }
+
+
+    @Override
+    public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
+        this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
+        monitorQueueSize(owner, this.queue);
+        return this.queue;
+    }
+
+    private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
+        if (owner.isEmpty()) {
+            return; //there's no actor to monitor
+        }
+        String actorName = owner.get().path().toStringWithoutAddress();
+        String metricName = registry.name(actorName, QUEUE_SIZE);
+
+        if (registry.getMetrics().containsKey(metricName))
+            return; //already registered
+
+        Gauge queueSize = getQueueSizeGuage(monitoredQueue);
+        registerQueueSizeMetric(metricName, queueSize);
+    }
+
+
+    public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
+
+        public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
+            super(capacity, pushTimeOut);
+        }
+    }
+
+    private Gauge getQueueSizeGuage(final MeteredMessageQueue monitoredQueue ){
+        return new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return monitoredQueue.size();
+            }
+        };
+    }
+
+    private void registerQueueSizeMetric(String metricName, Gauge metric){
+        try {
+            registry.register(metricName,metric);
+        } catch (IllegalArgumentException e) {
+            LOG.warn("Unable to register queue size in metrics registry. Failed with exception {}. ", e);
+        }
+    }
+}
+
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteringBehavior.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/MeteringBehavior.java
new file mode 100644 (file)
index 0000000..d67d413
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.actor.UntypedActor;
+import akka.japi.Procedure;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+
+/**
+ * Represents behaviour that can be exhibited by actors of type {@link akka.actor.UntypedActor}
+ * <p/>
+ * This behaviour meters actor's default behaviour. It captures 2 metrics:
+ * <ul>
+ *     <li>message processing rate of actor's receive block</li>
+ *     <li>message processing rate by message type</li>
+ * </ul>
+ *
+ * The information is reported to {@link org.opendaylight.controller.cluster.reporting.MetricsReporter}
+ */
+public class MeteringBehavior implements Procedure<Object> {
+
+    private final UntypedActor meteredActor;
+
+    private final MetricRegistry METRICREGISTRY = MetricsReporter.getInstance().getMetricsRegistry();
+    private final String MSG_PROCESSING_RATE = "msg-rate";
+
+    private String actorName;
+    private Timer msgProcessingTimer;
+
+    /**
+     *
+     * @param actor whose behaviour needs to be metered
+     */
+    public MeteringBehavior(UntypedActor actor){
+        Preconditions.checkArgument(actor != null, "actor must not be null");
+
+        this.meteredActor = actor;
+        actorName = meteredActor.getSelf().path().toStringWithoutAddress();
+        final String msgProcessingTime = MetricRegistry.name(actorName, MSG_PROCESSING_RATE);
+        msgProcessingTimer = METRICREGISTRY.timer(msgProcessingTime);
+    }
+
+    /**
+     * Uses 2 timers to measure message processing rate. One for overall message processing rate and
+     * another to measure rate by message type. The timers are re-used if they were previously created.
+     * <p/>
+     * {@link com.codahale.metrics.MetricRegistry} maintains a reservoir for different timers where
+     * collected timings are kept. It exposes various metrics for each timer based on collected
+     * data. Eg: count of messages, 99, 95, 50... percentiles, max, mean etc.
+     * <p/>
+     * These metrics are exposed as JMX bean.
+     *
+     * @see <a href="http://dropwizard.github.io/metrics/manual/core/#timers">
+     *     http://dropwizard.github.io/metrics/manual/core/#timers</a>
+     *
+     * @param message
+     * @throws Exception
+     */
+    @Override
+    public void apply(Object message) throws Exception {
+        final String messageType = message.getClass().getSimpleName();
+
+        final String msgProcessingTimeByMsgType =
+                MetricRegistry.name(actorName, MSG_PROCESSING_RATE, messageType);
+
+        final Timer msgProcessingTimerByMsgType = METRICREGISTRY.timer(msgProcessingTimeByMsgType);
+
+        //start timers
+        final Timer.Context context = msgProcessingTimer.time();
+        final Timer.Context contextByMsgType = msgProcessingTimerByMsgType.time();
+
+        meteredActor.onReceive(message);
+
+        //stop timers
+        contextByMsgType.stop();
+        context.stop();
+    }
+}
@@ -6,7 +6,7 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.remote.rpc.messages;
+package org.opendaylight.controller.cluster.common.actor;
 
 import akka.actor.ActorRef;
 
@@ -14,7 +14,6 @@ public class Monitor {
     private final ActorRef actorRef;
 
     public Monitor(ActorRef actorRef){
-
         this.actorRef = actorRef;
     }
 
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnifiedConfig.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/UnifiedConfig.java
new file mode 100644 (file)
index 0000000..62b6055
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+import com.typesafe.config.Config;
+
+/**
+ * Represents a unified view of configuration.
+ * <p/>
+ * It merges configuration from:
+ * <ul>
+ *     <li>Config subsystem</li>
+ *     <li>Akka configuration files</li>
+ * </ul>
+ *
+ * Configurations defined in config subsystem takes precedence.
+ */
+public interface UnifiedConfig {
+
+    /**
+     * Returns an immutable instance of unified configuration
+     * @return
+     */
+    public Config get();
+}
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.common.reporting;
+package org.opendaylight.controller.cluster.reporting;
 
 import com.codahale.metrics.JmxReporter;
 import com.codahale.metrics.MetricRegistry;
@@ -21,7 +21,7 @@ import com.codahale.metrics.MetricRegistry;
 public class MetricsReporter implements AutoCloseable{
 
     private final MetricRegistry METRICS_REGISTRY = new MetricRegistry();
-    private final String DOMAIN = "org.opendaylight.controller";
+    private final String DOMAIN = "org.opendaylight.controller.actor.metric";
 
     public final JmxReporter jmxReporter = JmxReporter.forRegistry(METRICS_REGISTRY).inDomain(DOMAIN).build();
 
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java
deleted file mode 100644 (file)
index c6d3625..0000000
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.common.actor;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.BoundedDequeBasedMailbox;
-import akka.dispatch.MailboxType;
-import akka.dispatch.ProducesMessageQueue;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.MetricRegistry;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.opendaylight.controller.common.reporting.MetricsReporter;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.TimeUnit;
-
-public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
-
-    private MeteredMessageQueue queue;
-    private Integer capacity;
-    private FiniteDuration pushTimeOut;
-    private ActorPath actorPath;
-    private MetricsReporter reporter;
-
-    private final String QUEUE_SIZE = "queue-size";
-    private final String CAPACITY = "mailbox-capacity";
-    private final String TIMEOUT  = "mailbox-push-timeout-time";
-    private final Long DEFAULT_TIMEOUT = 10L;
-
-    public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
-        Preconditions.checkArgument( config.hasPath("mailbox-capacity"), "Missing configuration [mailbox-capacity]" );
-        this.capacity = config.getInt(CAPACITY);
-        Preconditions.checkArgument( this.capacity > 0, "mailbox-capacity must be > 0");
-
-        Long timeout = -1L;
-        if ( config.hasPath(TIMEOUT) ){
-            timeout = config.getDuration(TIMEOUT, TimeUnit.NANOSECONDS);
-        } else {
-            timeout = DEFAULT_TIMEOUT;
-        }
-        Preconditions.checkArgument( timeout > 0, "mailbox-push-timeout-time must be > 0");
-        this.pushTimeOut = new FiniteDuration(timeout, TimeUnit.NANOSECONDS);
-
-        reporter = MetricsReporter.getInstance();
-    }
-
-
-    @Override
-    public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
-        this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
-        monitorQueueSize(owner, this.queue);
-        return this.queue;
-    }
-
-    private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
-        if (owner.isEmpty()) {
-            return; //there's no actor to monitor
-        }
-        actorPath = owner.get().path();
-        String actorInstanceId = Integer.toString(owner.get().hashCode());
-
-        MetricRegistry registry = reporter.getMetricsRegistry();
-        String actorName = registry.name(actorPath.toString(), actorInstanceId, QUEUE_SIZE);
-
-        if (registry.getMetrics().containsKey(actorName))
-            return; //already registered
-
-        registry.register(actorName,
-                new Gauge<Integer>() {
-                    @Override
-                    public Integer getValue() {
-                        return monitoredQueue.size();
-                    }
-                });
-    }
-
-
-    public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
-
-        public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
-            super(capacity, pushTimeOut);
-        }
-    }
-
-}
-
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/CommonConfigTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/CommonConfigTest.java
new file mode 100644 (file)
index 0000000..cd77ab2
--- /dev/null
@@ -0,0 +1,43 @@
+package org.opendaylight.controller.cluster.common.actor;
+
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CommonConfigTest {
+
+    @Test
+    public void testCommonConfigDefaults(){
+        CommonConfig config = new CommonConfig.Builder<>("testsystem").build();
+
+        assertNotNull(config.getActorSystemName());
+        assertNotNull(config.getMailBoxCapacity());
+        assertNotNull(config.getMailBoxName());
+        assertNotNull(config.getMailBoxPushTimeout());
+        assertNotNull(config.isMetricCaptureEnabled());
+    }
+
+    @Test
+    public void testCommonConfigOverride(){
+
+        int expectedCapacity = 123;
+        String timeoutValue = "1000ms";
+        CommonConfig config = new CommonConfig.Builder<>("testsystem")
+                .mailboxCapacity(expectedCapacity)
+                .mailboxPushTimeout(timeoutValue)
+                .metricCaptureEnabled(true)
+                .build();
+
+        assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+
+        FiniteDuration expectedTimeout = FiniteDuration.create(1000, TimeUnit.MILLISECONDS);
+        assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+
+        assertTrue(config.isMetricCaptureEnabled());
+    }
+}
\ No newline at end of file
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.common.actor;
+package org.opendaylight.controller.cluster.common.actor;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -25,11 +25,13 @@ import java.util.concurrent.locks.ReentrantLock;
 public class MeteredBoundedMailboxTest {
 
     private static ActorSystem actorSystem;
+    private static CommonConfig config;
     private final ReentrantLock lock = new ReentrantLock();
 
     @Before
     public void setUp() throws Exception {
-        actorSystem = ActorSystem.create("testsystem");
+        config = new CommonConfig.Builder<>("testsystem").build();
+        actorSystem = ActorSystem.create("testsystem", config.get());
     }
 
     @After
@@ -39,15 +41,14 @@ public class MeteredBoundedMailboxTest {
     }
 
     @Test
-    public void test_WhenQueueIsFull_ShouldSendMsgToDeadLetter() throws InterruptedException {
+    public void shouldSendMsgToDeadLetterWhenQueueIsFull() throws InterruptedException {
         final JavaTestKit mockReceiver = new JavaTestKit(actorSystem);
         actorSystem.eventStream().subscribe(mockReceiver.getRef(), DeadLetter.class);
 
 
         final FiniteDuration TWENTY_SEC = new FiniteDuration(20, TimeUnit.SECONDS);
 
-        String boundedMailBox = actorSystem.name() + ".bounded-mailbox";
-        ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(boundedMailBox),
+        ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(config.getMailBoxName()),
                                                      "pingpongactor");
 
         actorSystem.mailboxes().settings();
index 0392dec3dd43075c2df343526bafa5cbd59e4ada..a8a6513bb6e5dd725ea20f22d17f8b369b7a97b0 100644 (file)
@@ -1,7 +1,7 @@
 testsystem {
 
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 10
     mailbox-push-timeout-time = 100ms
   }
index 3481bae8ae05851cbc6509d41cdc65bf7a561715..43696c765202e9fdfc71f695e232ee25fffaf442 100644 (file)
@@ -1,8 +1,5 @@
-testsystem {
-
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }
-}
\ No newline at end of file
index 72da6304e54f2c19499189fa13ca0e74e30172db..fbb666a9caab32c90646796078132d1ed13c8bfa 100644 (file)
@@ -56,6 +56,9 @@
                         <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
                         <name>dom-broker</name>
                     </dom-broker>
+                    <enable-metric-capture xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">true</enable-metric-capture>
+                    <actor-system-name xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">odl-cluster-rpc</actor-system-name>
+                    <bounded-mailbox-capacity xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">1000</bounded-mailbox-capacity>
                 </module>
 
             </modules>
index 5a2116b50f92070687736329e95441a246ed49b0..f632b9cc839f97d5e33dd19ff51754b292c7682f 100644 (file)
@@ -1,10 +1,13 @@
 
 odl-cluster-data {
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 100ms
-  }    
+  }
+
+  metric-capture-enabled = true
+
   akka {
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
@@ -44,10 +47,13 @@ odl-cluster-data {
 
 odl-cluster-rpc {
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 100ms
   }
+
+  metric-capture-enabled = true
+
   akka {
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
@@ -62,7 +68,7 @@ odl-cluster-rpc {
     }
 
     cluster {
-      seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@127.0.0.1:2551"]
+      seed-nodes = ["akka.tcp://odl-cluster-rpc@127.0.0.1:2551"]
 
       auto-down-unreachable-after = 10s
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java
deleted file mode 100644 (file)
index b326d61..0000000
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.osgi.BundleDelegatingClassLoader;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.osgi.framework.BundleContext;
-
-import java.io.File;
-
-public class ActorSystemFactory {
-
-    public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
-    public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
-    public static final String CONFIGURATION_NAME = "odl-cluster-data";
-
-    private static volatile ActorSystem actorSystem = null;
-
-    public static final ActorSystem getInstance(){
-        return actorSystem;
-    }
-
-    /**
-     * This method should be called only once during initialization
-     *
-     * @param bundleContext
-     */
-    public static final ActorSystem createInstance(final BundleContext bundleContext) {
-        if(actorSystem == null) {
-            // Create an OSGi bundle classloader for actor system
-            BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
-                Thread.currentThread().getContextClassLoader());
-            synchronized (ActorSystemFactory.class) {
-                // Double check
-
-                if (actorSystem == null) {
-                    ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
-                        ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader);
-                    system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
-                    actorSystem = system;
-                }
-            }
-        }
-
-        return actorSystem;
-    }
-
-
-    private static final Config readAkkaConfiguration(){
-        File defaultConfigFile = new File(AKKA_CONF_PATH);
-        Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
-        return ConfigFactory.parseFile(defaultConfigFile);
-    }
-}
index b3283a18b1baaf4c3c7530570b8ae8a09512211f..f1c0df4c3ad2a336a6aa8edc7282aa399f160c13 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.Props;
 import akka.japi.Creator;
 
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
index 818f73392d1d880ad0dde5d84704c1e4c9d15c17..342611298c78b3ea5709473afaf8b2e2a19d4e9d 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.japi.Creator;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
index 8739ed1966b618a3843c8e23e5671ea05eb48f15..a6f187d065504b21f2f4cf5a7b0d27007880d831 100644 (file)
@@ -9,22 +9,60 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
-
+import akka.actor.Props;
+import akka.osgi.BundleDelegatingClassLoader;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.osgi.framework.BundleContext;
 
+import java.io.File;
+import java.util.concurrent.atomic.AtomicReference;
+
 public class DistributedDataStoreFactory {
+
+    public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf";
+    public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
+    public static final String CONFIGURATION_NAME = "odl-cluster-data";
+    private static AtomicReference<ActorSystem> actorSystem = new AtomicReference<>();
+
     public static DistributedDataStore createInstance(String name, SchemaService schemaService,
-            DatastoreContext datastoreContext, BundleContext bundleContext) {
+                                                      DatastoreContext datastoreContext, BundleContext bundleContext) {
 
-        ActorSystem actorSystem = ActorSystemFactory.createInstance(bundleContext);
+        ActorSystem actorSystem = getOrCreateInstance(bundleContext);
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore =
-            new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
-                    config, datastoreContext );
+                new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
+                        config, datastoreContext);
+
         ShardStrategyFactory.setConfiguration(config);
         schemaService.registerSchemaContextListener(dataStore);
         return dataStore;
     }
+
+    synchronized private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext) {
+
+        if (actorSystem.get() != null){
+            return actorSystem.get();
+        }
+        // Create an OSGi bundle classloader for actor system
+        BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
+                Thread.currentThread().getContextClassLoader());
+
+        ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
+                ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader);
+        system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+
+        actorSystem.set(system);
+        return system;
+    }
+
+
+    private static final Config readAkkaConfiguration() {
+        File defaultConfigFile = new File(AKKA_CONF_PATH);
+        Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing");
+        return ConfigFactory.parseFile(defaultConfigFile);
+    }
 }
index 7d570046d406feec976620f9215398475711a756..a1858f5f91c82fe20f56461138ba19daa4fa6681 100644 (file)
@@ -26,6 +26,8 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
+import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
@@ -138,6 +140,9 @@ public class Shard extends RaftActor {
         shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
 
+        if (isMetricsCaptureEnabled()) {
+            getContext().become(new MeteringBehavior(this));
+        }
     }
 
     private static Map<String, String> mapPeerAddresses(
@@ -406,7 +411,12 @@ public class Shard extends RaftActor {
         ActorRef transactionChain = getContext().actorOf(
                 ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
         getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
-            getSelf());
+                getSelf());
+    }
+
+    private boolean isMetricsCaptureEnabled(){
+        CommonConfig config = new CommonConfig(getContext().system().settings().config());
+        return config.isMetricCaptureEnabled();
     }
 
     @Override protected void applyState(ActorRef clientActor, String identifier,
index 58cdefe5371d2b58be6e7c9f5e461734f34acd07..13ecaa5619614e133b8afe5124acc25a9bacfc84 100644 (file)
@@ -18,6 +18,8 @@ import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
 import akka.japi.Function;
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
@@ -48,7 +50,7 @@ import java.util.Map;
  * <li> Monitor the cluster members and store their addresses
  * <ul>
  */
-public class ShardManager extends AbstractUntypedActor {
+public class ShardManager extends AbstractUntypedActorWithMetering {
 
     // Stores a mapping between a member name and the address of the member
     // Member names look like "member-1", "member-2" etc and are as specified
index 65f865b0c43ecdd6da13754605bccdc91a6f472e..b810ed9575a7af8d1f85a7b337639e2f3dfc72ca 100644 (file)
@@ -16,6 +16,8 @@ import akka.japi.Creator;
 
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+
 
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
index 484bd54a0743616ebb3fdb3bd95f0c1c253b1996..8fe94cf468b6b63e78128a5f44ca2b1b7cebdd55 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
index 2dce6a1079c4fdbb0a8e2fa090fa018908d3f5ce..e3ae5dac7b7950f3fc300a3189bc83922df24033 100644 (file)
@@ -18,6 +18,7 @@ import akka.japi.Creator;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
index c29f93bb073cb712df045bf4839cc247a2b49111..6be6cda5d3d741de221190e55fcf71d042e4c24a 100644 (file)
@@ -1,10 +1,13 @@
 
 odl-cluster-data {
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 100ms
   }
+
+  metric-capture-enabled = true
+
   akka {
     loggers = ["akka.event.slf4j.Slf4jLogger"]
     cluster {
index 82bc5e29bc98465624ad181a6e74b06942e9ed1b..e19a76703f69f61ec8df33decddf483cfc6e7192 100644 (file)
@@ -41,13 +41,13 @@ module distributed-datastore-provider {
             range "1..max";
         }
     }
-    
+
     typedef operation-timeout-type {
         type uint16 {
             range "5..max";
         }
     }
-    
+
     grouping data-store-properties {
         leaf max-shard-data-change-executor-queue-size {
             default 1000;
@@ -72,20 +72,32 @@ module distributed-datastore-provider {
             type non-zero-uint16-type;
             description "The maximum queue size for each shard's data store executor.";
          }
-            
+
          leaf shard-transaction-idle-timeout-in-minutes {
             default 10;
             type non-zero-uint16-type;
             description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
          }
-         
+
          leaf operation-timeout-in-seconds {
             default 5;
             type operation-timeout-type;
             description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
          }
+
+         leaf enable-metric-capture {
+            default false;
+            type boolean;
+            description "Enable or disable metric capture.";
+         }
+
+         leaf bounded-mailbox-capacity {
+             default 1000;
+             type non-zero-uint16-type;
+             description "Max queue size that an actor's mailbox can reach";
+         }
     }
-    
+
     // Augments the 'configuration' choice node under modules/module.
     augment "/config:modules/config:module/config:configuration" {
         case distributed-config-datastore-provider {
index 6851b1b72ce39d4c011cb0771b95014bb31ade32..794b376af8c694e3773c353dbd99668713ef9f10 100644 (file)
@@ -15,7 +15,7 @@ akka {
     }
 }
 bounded-mailbox {
-  mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+  mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
   mailbox-capacity = 1000
   mailbox-push-timeout-time = 100ms
 }
index 8d454c4bd6e43ef54aa8c8a24f659e6be1244f68..0fb468be868be7dfa3081530b5068bdef13882cc 100644 (file)
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-slf4j_${scala.version}</artifactId>
   </dependency>
+
+      <dependency>
+          <groupId>com.typesafe.akka</groupId>
+          <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
+      </dependency>
     <!-- SAL Dependencies -->
 
     <dependency>
index 2be8ba47b99f9881304f58eeee3dede71497821c..c2e8125df2b5be843f2a6ed560ea3c211817c1b1 100644 (file)
@@ -1,5 +1,7 @@
 package org.opendaylight.controller.config.yang.config.remote_rpc_connector;
 
+import org.opendaylight.controller.cluster.common.actor.DefaultAkkaConfigurationReader;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderFactory;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.osgi.framework.BundleContext;
@@ -22,7 +24,14 @@ public class RemoteRPCBrokerModule extends org.opendaylight.controller.config.ya
   @Override
   public java.lang.AutoCloseable createInstance() {
     Broker broker = getDomBrokerDependency();
-    return RemoteRpcProviderFactory.createInstance(broker, bundleContext);
+
+    RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder(getActorSystemName())
+                              .metricCaptureEnabled(getEnableMetricCapture())
+                              .mailboxCapacity(getBoundedMailboxCapacity())
+                              .withConfigReader(new DefaultAkkaConfigurationReader())
+                              .build();
+
+    return RemoteRpcProviderFactory.createInstance(broker, bundleContext, config);
   }
 
   public void setBundleContext(BundleContext bundleContext) {
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java
deleted file mode 100644 (file)
index 66593ae..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import org.opendaylight.controller.remote.rpc.messages.Monitor;
-
-public abstract class AbstractUntypedActor extends UntypedActor {
-    protected final LoggingAdapter LOG =
-        Logging.getLogger(getContext().system(), this);
-
-
-    public AbstractUntypedActor(){
-        LOG.debug("Actor created {}", getSelf());
-        getContext().
-            system().
-            actorSelection("user/termination-monitor").
-            tell(new Monitor(getSelf()), getSelf());
-    }
-
-    @Override public void onReceive(Object message) throws Exception {
-        LOG.debug("Received message {}", message);
-        handleReceive(message);
-        LOG.debug("Done handling message {}", message);
-    }
-
-    protected abstract void handleReceive(Object message) throws Exception;
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java
deleted file mode 100644 (file)
index da0d628..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-
-public class ActorConstants {
-  public static final String RPC_BROKER = "rpc-broker";
-  public static final String RPC_REGISTRY = "rpc-registry";
-  public static final String RPC_MANAGER = "rpc";
-
-  public static final String RPC_BROKER_PATH= "/user/rpc/rpc-broker";
-  public static final String RPC_REGISTRY_PATH = "/user/rpc/rpc-registry";
-  public static final String RPC_MANAGER_PATH = "/user/rpc";
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java
deleted file mode 100644 (file)
index 6a442c5..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-import akka.actor.ActorSystem;
-import akka.osgi.BundleDelegatingClassLoader;
-import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader;
-import org.osgi.framework.BundleContext;
-
-
-public class ActorSystemFactory {
-
-    public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-rpc";
-    public static final String CONFIGURATION_NAME = "odl-cluster-rpc";
-
-    private static volatile ActorSystem actorSystem = null;
-
-  public static final ActorSystem getInstance(){
-     return actorSystem;
-  }
-
-  /**
-   * This method should be called only once during initialization
-   *
-   * @param bundleContext
-   */
-  public static final void createInstance(final BundleContext bundleContext, AkkaConfigurationReader akkaConfigurationReader) {
-    if(actorSystem == null) {
-      // Create an OSGi bundle classloader for actor system
-      BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
-          Thread.currentThread().getContextClassLoader());
-      synchronized (ActorSystemFactory.class) {
-        // Double check
-        if (actorSystem == null) {
-          ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME,
-              akkaConfigurationReader.read().getConfig(CONFIGURATION_NAME), classLoader);
-          actorSystem = system;
-        }
-      }
-    } else {
-      throw new IllegalStateException("Actor system should be created only once. Use getInstance method to access existing actor system");
-    }
-  }
-
-}
index 7d7dbf0f3a58bc404882ad78186340d8eef2aba9..0f84abb22e8e6ca92d36e5486dac0f08ba699720 100644 (file)
@@ -1,42 +1,40 @@
 package org.opendaylight.controller.remote.rpc;
 
-import static akka.pattern.Patterns.ask;
 import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
-import akka.util.Timeout;
-
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
-import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.ExecutionContext;
 
 import java.util.Collections;
 import java.util.Set;
 
+import static akka.pattern.Patterns.ask;
+
 public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefaultImplementation {
     private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
     private final ActorRef rpcBroker;
     private final SchemaContext schemaContext;
+    private final RemoteRpcProviderConfig config;
 
-    public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) {
+    public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext, RemoteRpcProviderConfig config) {
         this.rpcBroker = rpcBroker;
         this.schemaContext = schemaContext;
+        this.config = config;
     }
 
     @Override
@@ -63,8 +61,7 @@ public class RemoteRpcImplementation implements RpcImplementation, RoutedRpcDefa
 
         final SettableFuture<RpcResult<CompositeNode>> listenableFuture = SettableFuture.create();
 
-        scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg,
-                new Timeout(ActorUtil.ASK_DURATION));
+        scala.concurrent.Future<Object> future = ask(rpcBroker, rpcMsg, config.getAskDuration());
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
index d088f2284d65904cc90d3aa3d6f39a4ca2cc99aa..8b4ce31d2ea0ee7e82352e72fd5249b37e6873a6 100644 (file)
@@ -31,21 +31,25 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext
 
   private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
 
-  private final ActorSystem actorSystem;
   private final RpcProvisionRegistry rpcProvisionRegistry;
+
+  private ActorSystem actorSystem;
   private Broker.ProviderSession brokerSession;
   private SchemaContext schemaContext;
   private ActorRef rpcManager;
+  private RemoteRpcProviderConfig config;
 
 
   public RemoteRpcProvider(ActorSystem actorSystem, RpcProvisionRegistry rpcProvisionRegistry) {
     this.actorSystem = actorSystem;
     this.rpcProvisionRegistry = rpcProvisionRegistry;
+    this.config = new RemoteRpcProviderConfig(actorSystem.settings().config());
   }
 
   @Override
   public void close() throws Exception {
-    this.actorSystem.shutdown();
+    if (this.actorSystem != null)
+      this.actorSystem.shutdown();
   }
 
   @Override
@@ -60,17 +64,17 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext
   }
 
   private void start() {
-    LOG.info("Starting all rpc listeners and actors.");
-    // Create actor to handle and sync routing table in cluster
+    LOG.info("Starting remote rpc service...");
+
     SchemaService schemaService = brokerSession.getService(SchemaService.class);
     schemaContext = schemaService.getGlobalContext();
 
-    rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry), ActorConstants.RPC_MANAGER);
+    rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, brokerSession, rpcProvisionRegistry),
+                                     config.getRpcManagerName());
 
-    LOG.debug("Rpc actors are created.");
+    LOG.debug("rpc manager started");
   }
 
-
   @Override
   public void onGlobalContextUpdated(SchemaContext schemaContext) {
     this.schemaContext = schemaContext;
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java
new file mode 100644 (file)
index 0000000..3f6d42d
--- /dev/null
@@ -0,0 +1,113 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.util.Timeout;
+import com.typesafe.config.Config;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class RemoteRpcProviderConfig extends CommonConfig {
+
+    protected static final String TAG_RPC_BROKER_NAME = "rpc-broker-name";
+    protected static final String TAG_RPC_REGISTRY_NAME = "registry-name";
+    protected static final String TAG_RPC_MGR_NAME = "rpc-manager-name";
+    protected static final String TAG_RPC_BROKER_PATH = "rpc-broker-path";
+    protected static final String TAG_RPC_REGISTRY_PATH = "rpc-registry-path";
+    protected static final String TAG_RPC_MGR_PATH = "rpc-manager-path";
+    protected static final String TAG_ASK_DURATION = "ask-duration";
+    private static final String TAG_GOSSIP_TICK_INTERVAL = "gossip-tick-interval";
+
+    //locally cached values
+    private Timeout cachedAskDuration;
+    private FiniteDuration cachedGossipTickInterval;
+
+    public RemoteRpcProviderConfig(Config config){
+        super(config);
+    }
+
+    public String getRpcBrokerName(){
+        return get().getString(TAG_RPC_BROKER_NAME);
+    }
+
+    public String getRpcRegistryName(){
+        return get().getString(TAG_RPC_REGISTRY_NAME);
+    }
+
+    public String getRpcManagerName(){
+        return get().getString(TAG_RPC_MGR_NAME);
+    }
+
+    public String getRpcBrokerPath(){
+        return get().getString(TAG_RPC_BROKER_PATH);
+    }
+
+    public String getRpcRegistryPath(){
+        return get().getString(TAG_RPC_REGISTRY_PATH);
+
+    }
+
+    public String getRpcManagerPath(){
+        return get().getString(TAG_RPC_MGR_PATH);
+    }
+
+
+    public Timeout getAskDuration(){
+        if (cachedAskDuration != null){
+            return cachedAskDuration;
+        }
+
+        cachedAskDuration = new Timeout(new FiniteDuration(
+                get().getDuration(TAG_ASK_DURATION, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS));
+
+        return cachedAskDuration;
+    }
+
+    public FiniteDuration getGossipTickInterval(){
+        if (cachedGossipTickInterval != null) {
+            return cachedGossipTickInterval;
+        }
+
+        cachedGossipTickInterval = new FiniteDuration(
+                get().getDuration(TAG_GOSSIP_TICK_INTERVAL, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+
+        return cachedGossipTickInterval;
+    }
+
+    public static class Builder extends CommonConfig.Builder<Builder>{
+
+        public Builder(String actorSystemName){
+            super(actorSystemName);
+
+            //Actor names
+            configHolder.put(TAG_RPC_BROKER_NAME, "broker");
+            configHolder.put(TAG_RPC_REGISTRY_NAME, "registry");
+            configHolder.put(TAG_RPC_MGR_NAME, "rpc");
+
+            //Actor paths
+            configHolder.put(TAG_RPC_BROKER_PATH, "/user/rpc/broker");
+            configHolder.put(TAG_RPC_REGISTRY_PATH, "/user/rpc/registry");
+            configHolder.put(TAG_RPC_MGR_PATH, "/user/rpc");
+
+            //durations
+            configHolder.put(TAG_ASK_DURATION, "15s");
+            configHolder.put(TAG_GOSSIP_TICK_INTERVAL, "500ms");
+
+        }
+
+        public RemoteRpcProviderConfig build(){
+            return new RemoteRpcProviderConfig(merge());
+        }
+    }
+
+
+}
index 0e6b795c058877069640a848fe1144575db37443..2e355d4f5146b13c31ee9f4a66a1bbed757d9354 100644 (file)
@@ -8,19 +8,43 @@
 
 package org.opendaylight.controller.remote.rpc;
 
-
-import org.opendaylight.controller.remote.rpc.utils.DefaultAkkaConfigurationReader;
+import akka.actor.ActorSystem;
+import akka.osgi.BundleDelegatingClassLoader;
+import com.typesafe.config.Config;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RemoteRpcProviderFactory {
-    public static RemoteRpcProvider createInstance(final Broker broker, final BundleContext bundleContext){
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProviderFactory.class);
+
+    public static RemoteRpcProvider createInstance(
+            final Broker broker, final BundleContext bundleContext, final RemoteRpcProviderConfig config){
 
-      ActorSystemFactory.createInstance(bundleContext, new DefaultAkkaConfigurationReader());
       RemoteRpcProvider rpcProvider =
-          new RemoteRpcProvider(ActorSystemFactory.getInstance(), (RpcProvisionRegistry) broker);
+          new RemoteRpcProvider(createActorSystem(bundleContext, config), (RpcProvisionRegistry) broker);
+
       broker.registerProvider(rpcProvider);
       return rpcProvider;
     }
+
+    private static ActorSystem createActorSystem(BundleContext bundleContext, RemoteRpcProviderConfig config){
+
+        // Create an OSGi bundle classloader for actor system
+        BundleDelegatingClassLoader classLoader =
+                new BundleDelegatingClassLoader(bundleContext.getBundle(),
+                        Thread.currentThread().getContextClassLoader());
+
+        Config actorSystemConfig = config.get();
+        LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render());
+
+        if (config.isMetricCaptureEnabled()) {
+            LOG.info("Instrumentation is enabled in actor system {}. Metrics can be viewed in JMX console.",
+                    config.getActorSystemName());
+        }
+
+        return ActorSystem.create(config.getActorSystemName(), actorSystemConfig, classLoader);
+    }
 }
index 2aca655d2628eb9d89295d09419d0cd44f7491d7..6b02235dc7d218c967fbd1e7d1d1a087d3304a30 100644 (file)
@@ -8,25 +8,26 @@
 
 package org.opendaylight.controller.remote.rpc;
 
-import static akka.pattern.Patterns.ask;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
 import akka.japi.Pair;
-import akka.util.Timeout;
-
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
-import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
 import org.opendaylight.controller.remote.rpc.utils.RoutingLogic;
-import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -36,16 +37,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Future;
 
+import static akka.pattern.Patterns.ask;
+
 /**
  * Actor to initiate execution of remote RPC on other nodes of the cluster.
  */
@@ -56,12 +54,14 @@ public class RpcBroker extends AbstractUntypedActor {
     private final Broker.ProviderSession brokerSession;
     private final ActorRef rpcRegistry;
     private final SchemaContext schemaContext;
+    private final RemoteRpcProviderConfig config;
 
     private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
             SchemaContext schemaContext) {
         this.brokerSession = brokerSession;
         this.rpcRegistry = rpcRegistry;
         this.schemaContext = schemaContext;
+        config = new RemoteRpcProviderConfig(getContext().system().settings().config());
     }
 
     public static Props props(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
@@ -85,8 +85,7 @@ public class RpcBroker extends AbstractUntypedActor {
                 null, msg.getRpc(), msg.getIdentifier());
         RpcRegistry.Messages.FindRouters findMsg = new RpcRegistry.Messages.FindRouters(routeId);
 
-        scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg,
-                new Timeout(ActorUtil.LOCAL_ASK_DURATION));
+        scala.concurrent.Future<Object> future = ask(rpcRegistry, findMsg, config.getAskDuration());
 
         final ActorRef sender = getSender();
         final ActorRef self = self();
@@ -129,8 +128,7 @@ public class RpcBroker extends AbstractUntypedActor {
         ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(),
                 schemaContext), msg.getRpc());
 
-        scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg,
-                new Timeout(ActorUtil.REMOTE_ASK_DURATION));
+        scala.concurrent.Future<Object> future = ask(logic.select(), executeMsg, config.getAskDuration());
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
index d4da226b9dc4278cd508e83082283a3163c6615c..4ae9c2e4d097b36d71a9a3207e14ee35fcff3c83 100644 (file)
@@ -15,11 +15,9 @@ import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.japi.Creator;
 import akka.japi.Function;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -27,6 +25,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
+
 import java.util.Set;
 
 /**
@@ -43,16 +42,19 @@ public class RpcManager extends AbstractUntypedActor {
   private ActorRef rpcBroker;
   private ActorRef rpcRegistry;
   private final Broker.ProviderSession brokerSession;
+  private final RemoteRpcProviderConfig config;
   private RpcListener rpcListener;
   private RoutedRpcListener routeChangeListener;
   private RemoteRpcImplementation rpcImplementation;
   private final RpcProvisionRegistry rpcProvisionRegistry;
 
   private RpcManager(SchemaContext schemaContext,
-                     Broker.ProviderSession brokerSession, RpcProvisionRegistry rpcProvisionRegistry) {
+                     Broker.ProviderSession brokerSession,
+                     RpcProvisionRegistry rpcProvisionRegistry) {
     this.schemaContext = schemaContext;
     this.brokerSession = brokerSession;
     this.rpcProvisionRegistry = rpcProvisionRegistry;
+    this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
 
     createRpcActors();
     startListeners();
@@ -60,7 +62,8 @@ public class RpcManager extends AbstractUntypedActor {
 
 
   public static Props props(final SchemaContext schemaContext,
-                            final Broker.ProviderSession brokerSession, final RpcProvisionRegistry rpcProvisionRegistry) {
+                            final Broker.ProviderSession brokerSession,
+                            final RpcProvisionRegistry rpcProvisionRegistry) {
     return Props.create(new Creator<RpcManager>() {
       @Override
       public RpcManager create() throws Exception {
@@ -72,15 +75,13 @@ public class RpcManager extends AbstractUntypedActor {
   private void createRpcActors() {
     LOG.debug("Create rpc registry and broker actors");
 
-      Config conf = ConfigFactory.load();
-
     rpcRegistry =
             getContext().actorOf(Props.create(RpcRegistry.class).
-                withMailbox(ActorUtil.MAILBOX), ActorConstants.RPC_REGISTRY);
+                withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
 
     rpcBroker =
             getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
-                withMailbox(ActorUtil.MAILBOX),ActorConstants.RPC_BROKER);
+                withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
 
     RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
     rpcRegistry.tell(localRouter, self());
@@ -91,7 +92,7 @@ public class RpcManager extends AbstractUntypedActor {
 
     rpcListener = new RpcListener(rpcRegistry);
     routeChangeListener = new RoutedRpcListener(rpcRegistry);
-    rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext);
+    rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext, config);
 
     brokerSession.addRpcRegistrationListener(rpcListener);
     rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
index a90f1e1ed251204d6d3e4a45b6516bb783dbb41a..abe2008c2936cb1ed12b8624f072b65c33d8c518 100644 (file)
@@ -12,7 +12,7 @@ import akka.actor.Terminated;
 import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
-import org.opendaylight.controller.remote.rpc.messages.Monitor;
+import org.opendaylight.controller.cluster.common.actor.Monitor;
 
 public class TerminationMonitor extends UntypedActor{
     protected final LoggingAdapter LOG =
index 5109d316446b13158e3739824e653e0259929135..095d70926b90d3838a777cc14a6976b31ccd9c97 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.remote.rpc.registry;
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.dispatch.Mapper;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
@@ -18,9 +17,10 @@ import akka.japi.Option;
 import akka.japi.Pair;
 import akka.pattern.Patterns;
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import scala.concurrent.Future;
 
@@ -30,9 +30,9 @@ import java.util.List;
 import java.util.Map;
 
 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
@@ -45,7 +45,7 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu
  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
  * cluster wide information.
  */
-public class RpcRegistry extends UntypedActor {
+public class RpcRegistry extends AbstractUntypedActorWithMetering {
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
@@ -59,9 +59,11 @@ public class RpcRegistry extends UntypedActor {
      */
     private ActorRef localRouter;
 
+    private RemoteRpcProviderConfig config;
+
     public RpcRegistry() {
         bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
-
+        this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
         log.info("Bucket store path = {}", bucketStore.path().toString());
     }
 
@@ -69,11 +71,9 @@ public class RpcRegistry extends UntypedActor {
         this.bucketStore = bucketStore;
     }
 
-    @Override
-    public void onReceive(Object message) throws Exception {
-
-        log.debug("Received message: message [{}]", message);
 
+    @Override
+    protected void handleReceive(Object message) throws Exception {
         //TODO: if sender is remote, reject message
 
         if (message instanceof SetLocalRouter)
@@ -108,7 +108,7 @@ public class RpcRegistry extends UntypedActor {
 
         Preconditions.checkState(localRouter != null, "Router must be set first");
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
         futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
     }
 
@@ -117,7 +117,7 @@ public class RpcRegistry extends UntypedActor {
      */
     private void receiveRemoveRoutes(RemoveRoutes msg) {
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
         futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
 
     }
@@ -130,7 +130,7 @@ public class RpcRegistry extends UntypedActor {
     private void receiveGetRouter(FindRouters msg) {
         final ActorRef sender = getSender();
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis());
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration());
         futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
     }
 
@@ -151,7 +151,8 @@ public class RpcRegistry extends UntypedActor {
      * @param routeId
      * @return
      */
-    private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+    private Messages.FindRoutersReply createReplyWithRouters(
+            Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
 
         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
         Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
@@ -184,7 +185,8 @@ public class RpcRegistry extends UntypedActor {
      * @param sender  client who asked to find the routers.
      * @return
      */
-    private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
+    private Mapper<Object, Void> getMapperToGetRouter(
+            final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
         return new Mapper<Object, Void>() {
             @Override
             public Void apply(Object replyMessage) {
index 3cdd924e8574afeee7b889f91905bbedd9a9e7f0..4dac456dc426d3493c8988355f40a79ca1df97ab 100644 (file)
@@ -11,7 +11,7 @@ import java.io.Serializable;
 
 public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
 
-    private Long version = System.currentTimeMillis();;
+    private Long version = System.currentTimeMillis();
 
     private T data;
 
index ff51f4fcfa671ff4ae71be72c6403b741e3de8dd..3de3fc00d0328215ef8f127e8272458f14326708 100644 (file)
@@ -12,11 +12,11 @@ import akka.actor.ActorRef;
 import akka.actor.ActorRefProvider;
 import akka.actor.Address;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.cluster.ClusterActorRefProvider;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.utils.ConditionalProbe;
 
 import java.util.HashMap;
@@ -45,14 +45,14 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
  *
  */
-public class BucketStore extends UntypedActor {
+public class BucketStore extends AbstractUntypedActorWithMetering {
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
     /**
      * Bucket owned by the node
      */
-    private BucketImpl localBucket = new BucketImpl();;
+    private BucketImpl localBucket = new BucketImpl();
 
     /**
      * Buckets ownded by other known nodes in the cluster
@@ -71,20 +71,24 @@ public class BucketStore extends UntypedActor {
 
     private ConditionalProbe probe;
 
+    private final RemoteRpcProviderConfig config;
+
+    public BucketStore(){
+        config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+    }
+
     @Override
     public void preStart(){
         ActorRefProvider provider = getContext().provider();
         selfAddress = provider.getDefaultAddress();
 
         if ( provider instanceof ClusterActorRefProvider)
-            getContext().actorOf(Props.create(Gossiper.class).withMailbox(ActorUtil.MAILBOX), "gossiper");
+            getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
     }
 
-    @Override
-    public void onReceive(Object message) throws Exception {
-
-        log.debug("Received message: node[{}], message[{}]", selfAddress, message);
 
+    @Override
+    protected void handleReceive(Object message) throws Exception {
         if (probe != null) {
             probe.tell(message, getSelf());
         }
@@ -100,17 +104,16 @@ public class BucketStore extends UntypedActor {
             receiveGetLocalBucket();
         } else if (message instanceof GetBucketsByMembers) {
             receiveGetBucketsByMembers(
-                ((GetBucketsByMembers) message).getMembers());
+                    ((GetBucketsByMembers) message).getMembers());
         } else if (message instanceof GetBucketVersions) {
             receiveGetBucketVersions();
         } else if (message instanceof UpdateRemoteBuckets) {
             receiveUpdateRemoteBuckets(
-                ((UpdateRemoteBuckets) message).getBuckets());
+                    ((UpdateRemoteBuckets) message).getBuckets());
         } else {
             log.debug("Unhandled message [{}]", message);
             unhandled(message);
         }
-
     }
 
     /**
index f6ce5e55f3ee63602fc92529e92d6e93d0ff9bb3..85c6ebe26f859e0751122d4ab60065a0f1b48aba 100644 (file)
@@ -12,7 +12,6 @@ import akka.actor.ActorRefProvider;
 import akka.actor.ActorSelection;
 import akka.actor.Address;
 import akka.actor.Cancellable;
-import akka.actor.UntypedActor;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterActorRefProvider;
 import akka.cluster.ClusterEvent;
@@ -21,7 +20,8 @@ import akka.dispatch.Mapper;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.pattern.Patterns;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -59,7 +59,7 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Go
  *
  */
 
-public class Gossiper extends UntypedActor {
+public class Gossiper extends AbstractUntypedActorWithMetering {
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
@@ -79,7 +79,11 @@ public class Gossiper extends UntypedActor {
 
     private Boolean autoStartGossipTicks = true;
 
-    public Gossiper(){}
+    private RemoteRpcProviderConfig config;
+
+    public Gossiper(){
+        config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+    }
 
     /**
      * Helpful for testing
@@ -106,7 +110,7 @@ public class Gossiper extends UntypedActor {
         if (autoStartGossipTicks) {
             gossipTask = getContext().system().scheduler().schedule(
                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
-                    ActorUtil.GOSSIP_TICK_INTERVAL,                 //interval
+                    config.getGossipTickInterval(),                 //interval
                     getSelf(),                                       //target
                     new Messages.GossiperMessages.GossipTick(),      //message
                     getContext().dispatcher(),                       //execution context
@@ -124,22 +128,19 @@ public class Gossiper extends UntypedActor {
     }
 
     @Override
-    public void onReceive(Object message) throws Exception {
-
-        log.debug("Received message: node[{}], message[{}]", selfAddress, message);
-
+    protected void handleReceive(Object message) throws Exception {
         //Usually sent by self via gossip task defined above. But its not enforced.
         //These ticks can be sent by another actor as well which is esp. useful while testing
         if (message instanceof GossipTick)
             receiveGossipTick();
 
-        //Message from remote gossiper with its bucket versions
+            //Message from remote gossiper with its bucket versions
         else if (message instanceof GossipStatus)
             receiveGossipStatus((GossipStatus) message);
 
-        //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
-        //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
-        //message with its local versions
+            //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
+            //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
+            //message with its local versions
         else if (message instanceof GossipEnvelope)
             receiveGossip((GossipEnvelope) message);
 
@@ -196,7 +197,7 @@ public class Gossiper extends UntypedActor {
     void receiveGossipTick(){
         if (clusterMembers.size() == 0) return; //no members to send gossip status to
 
-        Address remoteMemberToGossipTo = null;
+        Address remoteMemberToGossipTo;
 
         if (clusterMembers.size() == 1)
             remoteMemberToGossipTo = clusterMembers.get(0);
@@ -229,7 +230,7 @@ public class Gossiper extends UntypedActor {
 
         final ActorRef sender = getSender();
         Future<Object> futureReply =
-                Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+                Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
 
         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
 
@@ -271,7 +272,7 @@ public class Gossiper extends UntypedActor {
     void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
 
         Future<Object> futureReply =
-                Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), ActorUtil.ASK_DURATION.toMillis());
+                Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
     }
 
@@ -284,7 +285,7 @@ public class Gossiper extends UntypedActor {
 
         //Get local status from bucket store and send to remote
         Future<Object> futureReply =
-                Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+                Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
 
         //Find gossiper on remote system
         ActorSelection remoteRef = getContext().system().actorSelection(
@@ -382,8 +383,6 @@ public class Gossiper extends UntypedActor {
                             localIsOlder.add(address);
                         else if (localVersions.get(address) > remoteVersions.get(address))
                             localIsNewer.add(address);
-                        else
-                            continue;
                     }
 
                     if (!localIsOlder.isEmpty())
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java
deleted file mode 100644 (file)
index e2baffa..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- */
-package org.opendaylight.controller.remote.rpc.utils;
-
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.TimeUnit;
-
-public class ActorUtil {
-    public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS);
-    public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS);
-    public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS);
-    public static final FiniteDuration GOSSIP_TICK_INTERVAL = Duration.create(500, TimeUnit.MILLISECONDS);
-    public static final String MAILBOX = "bounded-mailbox";
-}
index 266832a0ab0491dfd5d58d2c0414c2733ef0300b..39ac9912746b35ec1f8633e1c0a439c5bb956e3d 100644 (file)
@@ -39,7 +39,7 @@ odl-cluster-data {
 
 odl-cluster-rpc {
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 100ms
   }
index 08db5c0043063bcfb17973f4279cdf31fb96d0fc..334d872c44845b42bd25662f9a143be4d54e0e4c 100644 (file)
@@ -5,11 +5,11 @@ module remote-rpc-connector {
 
     import config { prefix config; revision-date 2013-04-05; }
     import opendaylight-md-sal-dom {prefix dom;}
-    
+
     description
         "This module contains the base YANG definitions for
                  the remote routed rpc";
+
     revision "2014-07-07" {
         description
             "Initial revision";
@@ -25,7 +25,7 @@ module remote-rpc-connector {
     augment "/config:modules/config:module/config:configuration" {
         case remote-rpc-connector {
             when "/config:modules/config:module/config:type = 'remote-rpc-connector'";
-            
+
             container dom-broker {
                 uses config:service-ref {
                     refine type {
@@ -34,6 +34,24 @@ module remote-rpc-connector {
                     }
                 }
             }
+
+            leaf enable-metric-capture {
+                default false;
+                type boolean;
+                description "Enable or disable metric capture.";
+            }
+
+            leaf actor-system-name {
+                default odl-cluster-rpc;
+                type string;
+                description "Name by which actor system is identified. Its also used to find relevant configuration";
+            }
+
+            leaf bounded-mailbox-capacity {
+                default 1000;
+                type uint16;
+                description "Max queue size that an actor's mailbox can reach";
+            }
         }
     }
 
index 8d886829aa4edceee15a2ac30aa3c358b02f2343..46406fd4feebad58c7546ec5df12e5bb1f2f8137 100644 (file)
@@ -8,17 +8,10 @@
 
 package org.opendaylight.controller.remote.rpc;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.ImmutableList;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -26,9 +19,9 @@ import org.mockito.Mockito;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
@@ -36,12 +29,16 @@ import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 
-import com.google.common.collect.ImmutableList;
-import com.typesafe.config.ConfigFactory;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Base class for RPC tests.
@@ -70,8 +67,10 @@ public class AbstractRpcTest {
 
     @BeforeClass
     public static void setup() throws InterruptedException {
-        node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
-        node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
+        RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+        RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+        node1 = ActorSystem.create("opendaylight-rpc", config1.get());
+        node2 = ActorSystem.create("opendaylight-rpc", config2.get());
     }
 
     @AfterClass
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java
deleted file mode 100644 (file)
index cd1cd91..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc;
-
-
-import akka.actor.ActorSystem;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-import org.junit.After;
-import org.junit.Test;
-import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class ActorSystemFactoryTest {
-  ActorSystem system = null;
-
-  @Test
-  public void testActorSystemCreation(){
-    BundleContext context = mock(BundleContext.class);
-    when(context.getBundle()).thenReturn(mock(Bundle.class));
-
-    AkkaConfigurationReader reader = mock(AkkaConfigurationReader.class);
-    when(reader.read()).thenReturn(ConfigFactory.load());
-
-    ActorSystemFactory.createInstance(context, reader);
-    system = ActorSystemFactory.getInstance();
-    Assert.assertNotNull(system);
-    // Check illegal state exception
-
-    try {
-      ActorSystemFactory.createInstance(context, reader);
-      fail("Illegal State exception should be thrown, while creating actor system second time");
-    } catch (IllegalStateException e) {
-    }
-  }
-
-  @After
-  public void cleanup() throws InterruptedException {
-    if(system != null) {
-      system.shutdown();
-    }
-  }
-}
index 6c3a57b3448e23ac485aa04518c788f2fad390ce..49451dd0db99114141c289750cf37f7b12e036c1 100644 (file)
@@ -8,27 +8,26 @@
 
 package org.opendaylight.controller.remote.rpc;
 
-import static org.junit.Assert.assertEquals;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.Test;
 import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
 import org.opendaylight.controller.xml.codec.XmlUtils;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-import akka.testkit.JavaTestKit;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import static org.junit.Assert.assertEquals;
 
 /***
  * Unit tests for RemoteRpcImplementation.
@@ -42,7 +41,7 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         final AtomicReference<AssertionError> assertError = new AtomicReference<>();
         try {
             RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
-                    probeReg1.getRef(), schemaContext);
+                    probeReg1.getRef(), schemaContext, getConfig());
 
             final CompositeNode input = makeRPCInput("foo");
             final CompositeNode output = makeRPCOutput("bar");
@@ -68,7 +67,7 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         final AtomicReference<AssertionError> assertError = new AtomicReference<>();
         try {
             RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
-                    probeReg1.getRef(), schemaContext);
+                    probeReg1.getRef(), schemaContext, getConfig());
 
             QName instanceQName = new QName(new URI("ns"), "instance");
             YangInstanceIdentifier identifier = YangInstanceIdentifier.of(instanceQName);
@@ -99,7 +98,7 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         final AtomicReference<AssertionError> assertError = new AtomicReference<>();
         try {
             RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
-                    probeReg1.getRef(), schemaContext);
+                    probeReg1.getRef(), schemaContext, getConfig());
 
             final CompositeNode input = makeRPCInput("foo");
 
@@ -125,7 +124,7 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
         final AtomicReference<AssertionError> assertError = new AtomicReference<>();
         try {
             RemoteRpcImplementation rpcImpl = new RemoteRpcImplementation(
-                    probeReg1.getRef(), schemaContext);
+                    probeReg1.getRef(), schemaContext, getConfig());
 
             final CompositeNode input = makeRPCInput("foo");
 
@@ -182,4 +181,8 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
 
         return invokeRpcMsg;
     }
+
+    private RemoteRpcProviderConfig getConfig(){
+        return new RemoteRpcProviderConfig.Builder("unit-test").build();
+    }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfigTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfigTest.java
new file mode 100644 (file)
index 0000000..ae75252
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.testkit.TestActorRef;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+public class RemoteRpcProviderConfigTest {
+
+    @Test
+    public void testConfigDefaults() {
+
+        Config c = ConfigFactory.parseFile(new File("application.conf"));
+        RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test").build();
+
+        //Assert on configurations from common config
+        Assert.assertFalse(config.isMetricCaptureEnabled()); //should be disabled by default
+        Assert.assertNotNull(config.getMailBoxCapacity());
+        Assert.assertNotNull(config.getMailBoxName());
+        Assert.assertNotNull(config.getMailBoxPushTimeout());
+
+        //rest of the configurations should be set
+        Assert.assertNotNull(config.getActorSystemName());
+        Assert.assertNotNull(config.getRpcBrokerName());
+        Assert.assertNotNull(config.getRpcBrokerPath());
+        Assert.assertNotNull(config.getRpcManagerName());
+        Assert.assertNotNull(config.getRpcManagerPath());
+        Assert.assertNotNull(config.getRpcRegistryName());
+        Assert.assertNotNull(config.getRpcRegistryPath());
+        Assert.assertNotNull(config.getAskDuration());
+        Assert.assertNotNull(config.getGossipTickInterval());
+
+
+
+    }
+
+    @Test
+    public void testConfigCustomizations() {
+
+        AkkaConfigurationReader reader = new TestConfigReader();
+
+        final int expectedCapacity = 100;
+        String timeOutVal = "10ms";
+        FiniteDuration expectedTimeout = FiniteDuration.create(10, TimeUnit.MILLISECONDS);
+
+        RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test")
+                .metricCaptureEnabled(true)//enable metric capture
+                .mailboxCapacity(expectedCapacity)
+                .mailboxPushTimeout(timeOutVal)
+                .withConfigReader(reader)
+                .build();
+
+        Assert.assertTrue(config.isMetricCaptureEnabled());
+        Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+        Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+
+        //Now check this config inside an actor
+        ActorSystem system = ActorSystem.create("unit-test", config.get());
+        TestActorRef<ConfigTestActor> configTestActorTestActorRef =
+                TestActorRef.create(system, Props.create(ConfigTestActor.class));
+
+        ConfigTestActor actor = configTestActorTestActorRef.underlyingActor();
+        Config actorConfig = actor.getConfig();
+
+        config = new RemoteRpcProviderConfig(actorConfig);
+
+        Assert.assertTrue(config.isMetricCaptureEnabled());
+        Assert.assertEquals(expectedCapacity, config.getMailBoxCapacity().intValue());
+        Assert.assertEquals(expectedTimeout.toMillis(), config.getMailBoxPushTimeout().toMillis());
+    }
+
+    public static class ConfigTestActor extends UntypedActor {
+
+        private Config actorSystemConfig;
+
+        public ConfigTestActor() {
+            this.actorSystemConfig = getContext().system().settings().config();
+        }
+
+        @Override
+        public void onReceive(Object message) throws Exception {
+        }
+
+        /**
+         * Only for testing. NEVER expose actor's internal state like this.
+         *
+         * @return
+         */
+        public Config getConfig() {
+            return actorSystemConfig;
+        }
+    }
+
+    public static class TestConfigReader implements AkkaConfigurationReader {
+
+        @Override
+        public Config read() {
+            return ConfigFactory.parseResources("application.conf");
+
+        }
+    }
+}
\ No newline at end of file
index 8a7e4a039846205846e1b54b21981f78af843783..8b4599ca8ceac01684376b63be3dded36e354e2b 100644 (file)
@@ -13,7 +13,7 @@ package org.opendaylight.controller.remote.rpc;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.Config;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -33,11 +33,14 @@ import static org.mockito.Mockito.when;
 public class RemoteRpcProviderTest {
 
   static ActorSystem system;
-
+  static RemoteRpcProviderConfig moduleConfig;
 
   @BeforeClass
   public static void setup() throws InterruptedException {
-    system = ActorSystem.create("odl-cluster-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
+    moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc").build();
+    Config config = moduleConfig.get();
+    system = ActorSystem.create("odl-cluster-rpc", config);
+
   }
 
   @AfterClass
@@ -53,9 +56,14 @@ public class RemoteRpcProviderTest {
     SchemaService schemaService = mock(SchemaService.class);
     when(schemaService.getGlobalContext()). thenReturn(mock(SchemaContext.class));
     when(session.getService(SchemaService.class)).thenReturn(schemaService);
+
     rpcProvider.onSessionInitiated(session);
-    ActorRef actorRef = Await.result(system.actorSelection(ActorConstants.RPC_MANAGER_PATH).resolveOne(Duration.create(1, TimeUnit.SECONDS)),
-        Duration.create(2, TimeUnit.SECONDS));
-    Assert.assertTrue(actorRef.path().toString().contains(ActorConstants.RPC_MANAGER_PATH));
+
+    ActorRef actorRef = Await.result(
+            system.actorSelection(
+                    moduleConfig.getRpcManagerPath()).resolveOne(Duration.create(1, TimeUnit.SECONDS)),
+                                                                 Duration.create(2, TimeUnit.SECONDS));
+
+    Assert.assertTrue(actorRef.path().toString().contains(moduleConfig.getRpcManagerPath()));
   }
 }
index 83f52930b2b07ac32c75f83f437f1048132b54df..f6f720eed0f62dd50393c3275c6e7d9cc8fab23d 100644 (file)
@@ -9,12 +9,12 @@ import akka.actor.ChildActorPath;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Predicate;
-import com.typesafe.config.ConfigFactory;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
@@ -45,9 +45,12 @@ public class RpcRegistryTest {
 
   @BeforeClass
   public static void setup() throws InterruptedException {
-    node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
-    node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
-    node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC"));
+    RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+    RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+    RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build();
+    node1 = ActorSystem.create("opendaylight-rpc", config1.get());
+    node2 = ActorSystem.create("opendaylight-rpc", config2.get());
+    node3 = ActorSystem.create("opendaylight-rpc", config3.get());
   }
 
   @AfterClass
@@ -204,7 +207,10 @@ public class RpcRegistryTest {
         new ConditionalProbe(probe.getRef(), new Predicate() {
           @Override
           public boolean apply(@Nullable Object input) {
-            return clazz.equals(input.getClass());
+              if (input != null)
+                return clazz.equals(input.getClass());
+              else
+                  return false;
           }
         });
 
index 5c4af8d3da457c99344c0a26942c9d3456694f75..8e310815faed92617d626d14a5433bd61664a00b 100644 (file)
@@ -1,6 +1,6 @@
 odl-cluster-rpc{
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }
@@ -37,11 +37,12 @@ odl-cluster-rpc{
 }
 unit-test{
   akka {
-    loglevel = "INFO"
+    loglevel = "DEBUG"
     #loggers = ["akka.event.slf4j.Slf4jLogger"]
   }
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+    #mailbox-capacity is specified in config subsystem
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }
@@ -49,7 +50,7 @@ unit-test{
 
 memberA{
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }
@@ -82,7 +83,7 @@ memberA{
 }
 memberB{
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }
@@ -116,7 +117,7 @@ memberB{
 }
 memberC{
   bounded-mailbox {
-    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }