<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-parser-impl</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>config-api</artifactId>
+ </dependency>
+
</dependencies>
<build>
<plugins>
+ <plugin>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate-sources</goal>
+ </goals>
+ <configuration>
+ <codeGenerators>
+ <generator>
+ <codeGeneratorClass>org.opendaylight.yangtools.maven.sal.api.gen.plugin.CodeGeneratorImpl</codeGeneratorClass>
+ <outputBaseDir>${salGeneratorPath}</outputBaseDir>
+ </generator>
+ <generator>
+ <codeGeneratorClass>org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator</codeGeneratorClass>
+ <outputBaseDir>${jmxGeneratorPath}</outputBaseDir>
+ <additionalConfiguration>
+ <namespaceToPackage1>urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang</namespaceToPackage1>
+ </additionalConfiguration>
+ </generator>
+ <generator>
+ <codeGeneratorClass>org.opendaylight.yangtools.yang.unified.doc.generator.maven.DocumentationGeneratorImpl</codeGeneratorClass>
+ <outputBaseDir>target/site/models</outputBaseDir>
+ </generator>
+ </codeGenerators>
+ <inspectDependencies>true</inspectDependencies>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<configuration>
<instructions>
<Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
- <Export-Package>org.opendaylight.controller.cluster.*,org.opendaylight.common.actor,org.opendaylight.common.reporting,org.opendaylight.controller.protobuff.*,org.opendaylight.controller.xml.*</Export-Package>
- <Import-Package>*</Import-Package>
</instructions>
</configuration>
</plugin>
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster;
+
+import akka.actor.ActorSystem;
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * Interface that provides an akka ActorSystem instance.
+ *
+ * @author Thomas Pantelis
+ */
+public interface ActorSystemProvider {
+ /**
+ * @return the current ActorSystem.
+ */
+ @Nonnull
+ ActorSystem getActorSystem();
+
+ /**
+ * Register a listener for ActorSystem lifecycle events.
+ *
+ * @param listener the ActorSystemProviderListener to register
+ * @return a ListenerRegistration instance to be used to unregister
+ */
+ ListenerRegistration<ActorSystemProviderListener> registerActorSystemProviderListener(
+ @Nonnull ActorSystemProviderListener listener);
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster;
+
+import akka.actor.ActorSystem;
+import java.util.EventListener;
+
+/**
+ * Listener interface for notification of ActorSystem changes from an ActorSystemProvider.
+ *
+ * @author Thomas Pantelis
+ */
+public interface ActorSystemProviderListener extends EventListener {
+ /**
+ * Method called when the current actor system is about to be shutdown.
+ */
+ void onPreShutdownActorSystem();
+
+ /**
+ * Method called when the current actor system is shutdown and a new actor system is created. This method
+ * is always preceded by a call to {@link #onPreShutdownActorSystem}.
+ *
+ * @param actorSytem the new ActorSystem
+ */
+ void onNewActorSystem(ActorSystem actorSytem);
+}
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-
import java.util.HashMap;
import java.util.Map;
public abstract class AbstractConfig implements UnifiedConfig {
- private Config config;
+ private final Config config;
public AbstractConfig(Config config){
this.config = config;
return (T)this;
}
- protected Config merge(){
- if (fallback == null)
- fallback = ConfigFactory.load().getConfig(actorSystemName);
+ protected Config merge() {
+ Config config = ConfigFactory.parseMap(configHolder);
+ if (fallback != null) {
+ config = config.withFallback(fallback);
+ }
- return ConfigFactory.parseMap(configHolder).withFallback(fallback);
+ return config;
}
}
}
--- /dev/null
+module actor-system-provider-service {
+ yang-version 1;
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:service";
+ prefix "actor-system";
+
+ import config { prefix config; revision-date 2013-04-05; }
+
+ description "Akka actor system provider service definition";
+
+ revision "2015-10-05" {
+ description "Initial revision";
+ }
+
+ identity actor-system-provider-service {
+ base "config:service-type";
+ config:java-class "org.opendaylight.controller.cluster.ActorSystemProvider";
+ }
+}
\ No newline at end of file
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.testkit.JavaTestKit;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
public class MeteredBoundedMailboxTest {
private static ActorSystem actorSystem;
@Before
public void setUp() throws Exception {
- config = new CommonConfig.Builder<>("testsystem").build();
+ config = new CommonConfig.Builder<>("testsystem").withConfigReader(new AkkaConfigurationReader() {
+ @Override
+ public Config read() {
+ return ConfigFactory.load();
+ }
+ }).build();
actorSystem = ActorSystem.create("testsystem", config.get());
}
@After
public void tearDown() throws Exception {
- if (actorSystem != null)
- actorSystem.shutdown();
+ if (actorSystem != null) {
+ actorSystem.shutdown();
+ }
}
@Test
public void onReceive(Object message) throws Exception {
lock.lock();
try {
- if ("ping".equals(message))
+ if ("ping".equals(message)) {
getSender().tell("pong", getSelf());
+ }
} finally {
lock.unlock();
}
</operational-data-store>
</module>
+ <module>
+ <type xmlns:as="urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:impl">as:actor-system-provider-impl</type>
+ <name>actor-system-provider-impl</name>
+ </module>
+
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider">prefix:distributed-operational-datastore-provider</type>
<name>distributed-operational-store-module</name>
<type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
<name>yang-schema-service</name>
</operational-schema-service>
+ <operational-actor-system-provider>
+ <type xmlns:as="urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:service">as:actor-system-provider-service</type>
+ <name>actor-system-provider</name>
+ </operational-actor-system-provider>
<operational-properties>
<persistent>false</persistent>
<shard-election-timeout-factor>20</shard-election-timeout-factor>
<type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
<name>yang-schema-service</name>
</config-schema-service>
+ <config-actor-system-provider>
+ <type xmlns:as="urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:service">as:actor-system-provider-service</type>
+ <name>actor-system-provider</name>
+ </config-actor-system-provider>
<config-properties>
<shard-election-timeout-factor>20</shard-election-timeout-factor>
</config-properties>
<type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
<name>dom-broker</name>
</dom-broker>
+ <actor-system-provider>
+ <type xmlns:as="urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:service">as:actor-system-provider-service</type>
+ <name>actor-system-provider</name>
+ </actor-system-provider>
<enable-metric-capture xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">true</enable-metric-capture>
- <actor-system-name xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">odl-cluster-rpc</actor-system-name>
<bounded-mailbox-capacity xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">1000</bounded-mailbox-capacity>
</module>
<services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ <service>
+ <type xmlns:as="urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:service">as:actor-system-provider-service</type>
+ <instance>
+ <name>actor-system-provider</name>
+ <provider>/modules/module[type='actor-system-provider-impl'][name='actor-system-provider-impl']</provider>
+ </instance>
+ </service>
<service>
<type xmlns:config-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store">config-dom-store-spi:config-dom-datastore</type>
<instance>
</configuration>
<required-capabilities>
<capability>urn:opendaylight:params:xml:ns:yang:controller:config:concurrent-data-broker?module=odl-concurrent-data-broker-cfg&revision=2014-11-24</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:service?module=actor-system-provider-service&revision=2015-10-05</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:impl?module=actor-system-provider-impl&revision=2015-10-05</capability>
<capability>urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider?module=distributed-datastore-provider&revision=2014-06-12</capability>
<capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store?module=opendaylight-config-dom-datastore&revision=2014-06-17</capability>
<capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store?module=opendaylight-operational-dom-datastore&revision=2014-06-17</capability>
}
}
}
-
-odl-cluster-rpc {
- bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
- mailbox-capacity = 1000
- mailbox-push-timeout-time = 100ms
- }
-
- metric-capture-enabled = true
-
- akka {
- loglevel = "INFO"
- loggers = ["akka.event.slf4j.Slf4jLogger"]
- logger-startup-timeout = 300s
-
- actor {
- provider = "akka.cluster.ClusterActorRefProvider"
-
- }
- remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2551
- maximum-frame-size = 419430400
- send-buffer-size = 52428800
- receive-buffer-size = 52428800
- }
- }
-
- cluster {
- seed-nodes = ["akka.tcp://odl-cluster-rpc@127.0.0.1:2551"]
-
- auto-down-unreachable-after = 300s
- }
- }
-}
txContextFactory.close();
actorContext.shutdown();
- DistributedDataStoreFactory.destroyInstance(this);
}
public ActorContext getActorContext() {
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.osgi.BundleDelegatingClassLoader;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.ConfigFactory;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
public class DistributedDataStoreFactory {
- private static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
- private static final String CONFIGURATION_NAME = "odl-cluster-data";
- private static ActorSystem actorSystem = null;
- private static final Set<DistributedDataStore> createdInstances = new HashSet<>(2);
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreFactory.class);
- public static synchronized DistributedDataStore createInstance(SchemaService schemaService,
- DatastoreContext datastoreContext, BundleContext bundleContext) {
+ public static DistributedDataStore createInstance(SchemaService schemaService,
+ DatastoreContext datastoreContext, ActorSystem actorSystem, BundleContext bundleContext) {
LOG.info("Create data store instance of type : {}", datastoreContext.getDataStoreType());
DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
introspector, bundleContext);
- ActorSystem actorSystem = getActorSystem(bundleContext, datastoreContext.getConfigurationReader());
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
new ClusterWrapperImpl(actorSystem), config, introspector.getContext());
dataStore.setCloseable(overlay);
dataStore.waitTillReady();
- createdInstances.add(dataStore);
return dataStore;
}
-
- private static synchronized final ActorSystem getActorSystem(final BundleContext bundleContext,
- AkkaConfigurationReader configurationReader) {
- if (actorSystem == null) {
- // Create an OSGi bundle classloader for actor system
- BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
- Thread.currentThread().getContextClassLoader());
-
- actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME,
- ConfigFactory.load(configurationReader.read()).getConfig(CONFIGURATION_NAME), classLoader);
- actorSystem.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
- }
-
- return actorSystem;
- }
-
- public static synchronized void destroyInstance(DistributedDataStore dataStore){
- Preconditions.checkNotNull(dataStore, "dataStore should not be null");
-
- LOG.info("Destroy data store instance of type : {}", dataStore.getActorContext().getDataStoreType());
-
- if(createdInstances.remove(dataStore)){
- if(createdInstances.size() == 0){
- if(actorSystem != null) {
- actorSystem.shutdown();
- try {
- actorSystem.awaitTermination(Duration.create(10, TimeUnit.SECONDS));
- } catch (Exception e) {
- LOG.warn("Error awaiting actor termination", e);
- }
- actorSystem = null;
- }
- }
- }
- }
-
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.config.yang.config.actor_system_provider.impl;
+
+import akka.actor.ActorSystem;
+import akka.osgi.BundleDelegatingClassLoader;
+import com.typesafe.config.ConfigFactory;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.cluster.ActorSystemProviderListener;
+import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
+import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.ListenerRegistry;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+public class ActorSystemProviderImpl implements ActorSystemProvider, AutoCloseable {
+ private static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
+ private static final String CONFIGURATION_NAME = "odl-cluster-data";
+ static final Logger LOG = LoggerFactory.getLogger(ActorSystemProviderImpl.class);
+
+ private ActorSystem actorSystem;
+ private final BundleDelegatingClassLoader classLoader;
+ private final ListenerRegistry<ActorSystemProviderListener> listeners = new ListenerRegistry<>();
+
+ public ActorSystemProviderImpl(BundleContext bundleContext) {
+ LOG.info("Creating new ActorSystem");
+
+ classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
+ Thread.currentThread().getContextClassLoader());
+
+ createActorSystem();
+ }
+
+ private void createActorSystem() {
+ AkkaConfigurationReader configurationReader = new FileAkkaConfigurationReader();
+ actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME,
+ ConfigFactory.load(configurationReader.read()).getConfig(CONFIGURATION_NAME), classLoader);
+ }
+
+ @Override
+ public ActorSystem getActorSystem() {
+ return actorSystem;
+ }
+
+ @Override
+ public ListenerRegistration<ActorSystemProviderListener> registerActorSystemProviderListener(
+ ActorSystemProviderListener listener) {
+ return listeners.register(listener);
+ }
+
+ @Override
+ public void close() {
+ LOG.info("Shutting down ActorSystem");
+
+ actorSystem.shutdown();
+ try {
+ actorSystem.awaitTermination(Duration.create(10, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ LOG.warn("Error awaiting actor termination", e);
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+package org.opendaylight.controller.config.yang.config.actor_system_provider.impl;
+
+import org.osgi.framework.BundleContext;
+
+public class ActorSystemProviderModule extends AbstractActorSystemProviderModule {
+ private BundleContext bundleContext;
+
+ public ActorSystemProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ super(identifier, dependencyResolver);
+ }
+
+ public ActorSystemProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, ActorSystemProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
+ super(identifier, dependencyResolver, oldModule, oldInstance);
+ }
+
+ @Override
+ public void customValidation() {
+ // add custom validation form module attributes here.
+ }
+
+ @Override
+ public java.lang.AutoCloseable createInstance() {
+ return new ActorSystemProviderImpl(bundleContext);
+ }
+
+ public void setBundleContext(BundleContext bundleContext) {
+ this.bundleContext = bundleContext;
+ }
+}
--- /dev/null
+/*
+* Generated file
+*
+* Generated from: yang module name: actor-system-provider-impl yang module local name: actor-system-provider-impl
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Tue Oct 06 02:11:27 EDT 2015
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.config.actor_system_provider.impl;
+
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
+import org.opendaylight.controller.config.spi.Module;
+import org.osgi.framework.BundleContext;
+
+public class ActorSystemProviderModuleFactory extends AbstractActorSystemProviderModuleFactory {
+ @Override
+ public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) {
+ ActorSystemProviderModule module = (ActorSystemProviderModule)super.createModule(instanceName,dependencyResolver,bundleContext);
+ module.setBundleContext(bundleContext);
+ return module;
+ }
+
+ @Override
+ public Module createModule(String instanceName, DependencyResolver dependencyResolver,
+ DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception {
+ ActorSystemProviderModule module = (ActorSystemProviderModule)super.createModule(instanceName, dependencyResolver,
+ old, bundleContext);
+ module.setBundleContext(bundleContext);
+ return module;
+ }
+}
.build();
return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
- datastoreContext, bundleContext);
+ datastoreContext, getConfigActorSystemProviderDependency().getActorSystem(), bundleContext);
}
public void setBundleContext(BundleContext bundleContext) {
.build();
return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
- datastoreContext, bundleContext);
+ datastoreContext, getOperationalActorSystemProviderDependency().getActorSystem(), bundleContext);
}
public void setBundleContext(BundleContext bundleContext) {
--- /dev/null
+module actor-system-provider-impl {
+ yang-version 1;
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:impl";
+ prefix "actor-system-impl";
+
+ import config { prefix config; revision-date 2013-04-05; }
+ import actor-system-provider-service {prefix actor-system;}
+
+ description "Akka actor system provider implementation";
+
+ revision "2015-10-05" {
+ description "Initial revision";
+ }
+
+ identity actor-system-provider-impl {
+ base config:module-type;
+ config:provided-service actor-system:actor-system-provider-service;
+ config:java-name-prefix ActorSystemProvider;
+ }
+
+ augment "/config:modules/config:module/config:configuration" {
+ case actor-system-provider-impl {
+ when "/config:modules/config:module/config:type = 'actor-system-provider-impl'";
+ }
+ }
+}
\ No newline at end of file
import opendaylight-config-dom-datastore {prefix config-dom-store-spi;}
import opendaylight-operational-dom-datastore {prefix operational-dom-store-spi;}
import opendaylight-md-sal-dom {prefix sal;}
+ import actor-system-provider-service {prefix actor-system;}
description
"This module contains the base YANG definitions for
container config-schema-service {
uses config:service-ref {
refine type {
- mandatory false;
+ mandatory true;
config:required-identity sal:schema-service;
}
}
}
+ container config-actor-system-provider {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity actor-system:actor-system-provider-service;
+ }
+ }
+ }
+
container config-properties {
uses data-store-properties;
}
container operational-schema-service {
uses config:service-ref {
refine type {
- mandatory false;
+ mandatory true;
config:required-identity sal:schema-service;
}
}
}
+ container operational-actor-system-provider {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity actor-system:actor-system-provider-service;
+ }
+ }
+ }
+
container operational-properties {
uses data-store-properties;
}
package org.opendaylight.controller.config.yang.config.remote_rpc_connector;
-import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
+import akka.actor.ActorSystem;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderFactory;
import org.opendaylight.controller.sal.core.api.Broker;
import org.osgi.framework.BundleContext;
public class RemoteRPCBrokerModule extends org.opendaylight.controller.config.yang.config.remote_rpc_connector.AbstractRemoteRPCBrokerModule {
- private BundleContext bundleContext;
- public RemoteRPCBrokerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
- super(identifier, dependencyResolver);
- }
-
- public RemoteRPCBrokerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.config.remote_rpc_connector.RemoteRPCBrokerModule oldModule, java.lang.AutoCloseable oldInstance) {
- super(identifier, dependencyResolver, oldModule, oldInstance);
- }
-
- @Override
- public void customValidation() {
- // add custom validation form module attributes here.
- }
-
- @Override
- public boolean canReuseInstance(AbstractRemoteRPCBrokerModule oldModule) {
- return true;
- }
-
- @Override
- public java.lang.AutoCloseable createInstance() {
- Broker broker = getDomBrokerDependency();
-
- RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder(getActorSystemName())
- .metricCaptureEnabled(getEnableMetricCapture())
- .mailboxCapacity(getBoundedMailboxCapacity())
- .withConfigReader(new FileAkkaConfigurationReader())
- .build();
-
- return RemoteRpcProviderFactory.createInstance(broker, bundleContext, config);
- }
-
- public void setBundleContext(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- }
+ private BundleContext bundleContext;
+ public RemoteRPCBrokerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ super(identifier, dependencyResolver);
+ }
+
+ public RemoteRPCBrokerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.config.remote_rpc_connector.RemoteRPCBrokerModule oldModule, java.lang.AutoCloseable oldInstance) {
+ super(identifier, dependencyResolver, oldModule, oldInstance);
+ }
+
+ @Override
+ public void customValidation() {
+ // add custom validation form module attributes here.
+ }
+
+ @Override
+ public boolean canReuseInstance(AbstractRemoteRPCBrokerModule oldModule) {
+ return true;
+ }
+
+ @Override
+ public java.lang.AutoCloseable createInstance() {
+ Broker broker = getDomBrokerDependency();
+
+ ActorSystem actorSystem = getActorSystemProviderDependency().getActorSystem();
+ RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder(actorSystem.name())
+ .metricCaptureEnabled(getEnableMetricCapture())
+ .mailboxCapacity(getBoundedMailboxCapacity())
+ .build();
+
+ return RemoteRpcProviderFactory.createInstance(broker, bundleContext, actorSystem, config);
+ }
+
+ public void setBundleContext(BundleContext bundleContext) {
+ this.bundleContext = bundleContext;
+ }
}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import com.google.common.base.Preconditions;
import java.util.Collection;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
private final RemoteRpcProviderConfig config;
- public RemoteRpcProvider(final ActorSystem actorSystem, final DOMRpcProviderService rpcProvisionRegistry) {
+ public RemoteRpcProvider(final ActorSystem actorSystem,
+ final DOMRpcProviderService rpcProvisionRegistry,
+ final RemoteRpcProviderConfig config) {
this.actorSystem = actorSystem;
this.rpcProvisionRegistry = rpcProvisionRegistry;
- config = new RemoteRpcProviderConfig(actorSystem.settings().config());
+ this.config = Preconditions.checkNotNull(config);
}
@Override
final DOMRpcService rpcService = brokerSession.getService(DOMRpcService.class);
schemaContext = schemaService.getGlobalContext();
rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext,
- rpcProvisionRegistry, rpcService), config.getRpcManagerName());
+ rpcProvisionRegistry, rpcService, config), config.getRpcManagerName());
schemaListenerRegistration = schemaService.registerSchemaContextListener(this);
LOG.debug("rpc manager started");
}
import akka.util.Timeout;
import com.typesafe.config.Config;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
-
/**
*/
public class RemoteRpcProviderConfig extends CommonConfig {
package org.opendaylight.controller.remote.rpc;
import akka.actor.ActorSystem;
-import akka.osgi.BundleDelegatingClassLoader;
-import com.typesafe.config.Config;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
import org.opendaylight.controller.sal.core.api.Broker;
import org.osgi.framework.BundleContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class RemoteRpcProviderFactory {
- private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProviderFactory.class);
+ public static RemoteRpcProvider createInstance(final Broker broker, final BundleContext bundleContext,
+ final ActorSystem actorSystem, final RemoteRpcProviderConfig config) {
- public static RemoteRpcProvider createInstance(
- final Broker broker, final BundleContext bundleContext, final RemoteRpcProviderConfig config){
+ final RemoteRpcProvider rpcProvider = new RemoteRpcProvider(actorSystem, (DOMRpcProviderService) broker, config);
- final RemoteRpcProvider rpcProvider =
- new RemoteRpcProvider(createActorSystem(bundleContext, config), (DOMRpcProviderService) broker);
-
- broker.registerProvider(rpcProvider);
- return rpcProvider;
- }
-
- private static ActorSystem createActorSystem(final BundleContext bundleContext, final RemoteRpcProviderConfig config){
-
- // Create an OSGi bundle classloader for actor system
- final BundleDelegatingClassLoader classLoader =
- new BundleDelegatingClassLoader(bundleContext.getBundle(),
- Thread.currentThread().getContextClassLoader());
-
- final Config actorSystemConfig = config.get();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Actor system configuration\n{}", actorSystemConfig.root().render());
- }
- if (config.isMetricCaptureEnabled()) {
- LOG.info("Instrumentation is enabled in actor system {}. Metrics can be viewed in JMX console.",
- config.getActorSystemName());
- }
-
- return ActorSystem.create(config.getActorSystemName(), actorSystemConfig, classLoader);
+ broker.registerProvider(rpcProvider);
+ return rpcProvider;
}
}
private RpcManager(final SchemaContext schemaContext,
final DOMRpcProviderService rpcProvisionRegistry,
- final DOMRpcService rpcSevices) {
+ final DOMRpcService rpcSevices,
+ final RemoteRpcProviderConfig config) {
this.schemaContext = schemaContext;
this.rpcProvisionRegistry = rpcProvisionRegistry;
rpcServices = rpcSevices;
- config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+ this.config = config;
createRpcActors();
startListeners();
public static Props props(final SchemaContext schemaContext,
- final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices) {
+ final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
+ final RemoteRpcProviderConfig config) {
Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
- return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices);
+ return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices, config);
}
private void createRpcActors() {
LOG.debug("Create rpc registry and broker actors");
rpcRegistry =
- getContext().actorOf(RpcRegistry.props().
+ getContext().actorOf(RpcRegistry.props(config).
withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
rpcBroker =
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
*/
public class RpcRegistry extends BucketStore<RoutingTable> {
- public RpcRegistry() {
+ public RpcRegistry(RemoteRpcProviderConfig config) {
+ super(config);
getLocalBucket().setData(new RoutingTable());
}
- public static Props props() {
- return Props.create(new RpcRegistryCreator());
+ public static Props props(RemoteRpcProviderConfig config) {
+ return Props.create(new RpcRegistryCreator(config));
}
@Override
private static class RpcRegistryCreator implements Creator<RpcRegistry> {
private static final long serialVersionUID = 1L;
+ private final RemoteRpcProviderConfig config;
+
+ private RpcRegistryCreator(RemoteRpcProviderConfig config) {
+ this.config = config;
+ }
@Override
public RpcRegistry create() throws Exception {
- RpcRegistry registry = new RpcRegistry();
+ RpcRegistry registry = new RpcRegistry(config);
RemoteRpcRegistryMXBean mxBean = new RemoteRpcRegistryMXBeanImpl(registry);
return registry;
}
import akka.actor.Address;
import akka.actor.Props;
import akka.cluster.ClusterActorRefProvider;
+import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
private final RemoteRpcProviderConfig config;
- public BucketStore(){
- config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+ public BucketStore(RemoteRpcProviderConfig config){
+ this.config = Preconditions.checkNotNull(config);
}
@Override
selfAddress = provider.getDefaultAddress();
if ( provider instanceof ClusterActorRefProvider) {
- getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
+ getContext().actorOf(Props.create(Gossiper.class, config).withMailbox(config.getMailBoxName()), "gossiper");
}
}
import akka.cluster.Member;
import akka.dispatch.Mapper;
import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
private Boolean autoStartGossipTicks = true;
- private RemoteRpcProviderConfig config;
+ private final RemoteRpcProviderConfig config;
- public Gossiper(){
- config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+ public Gossiper(RemoteRpcProviderConfig config){
+ this.config = Preconditions.checkNotNull(config);
}
/**
* @param autoStartGossipTicks used for turning off gossip ticks during testing.
* Gossip tick can be manually sent.
*/
- public Gossiper(Boolean autoStartGossipTicks){
+ public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config){
+ this(config);
this.autoStartGossipTicks = autoStartGossipTicks;
}
import config { prefix config; revision-date 2013-04-05; }
import opendaylight-md-sal-dom {prefix dom;}
+ import actor-system-provider-service {prefix actor-system;}
description
"This module contains the base YANG definitions for
}
}
+ container actor-system-provider {
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity actor-system:actor-system-provider-service;
+ }
+ }
+ }
+
leaf enable-metric-capture {
default false;
type boolean;
description "Enable or disable metric capture.";
}
- leaf actor-system-name {
- default odl-cluster-rpc;
- type string;
- description "Name by which actor system is identified. Its also used to find relevant configuration";
- }
-
leaf bounded-mailbox-capacity {
default 1000;
type uint16;
@Test
public void testRemoteRpcProvider() throws Exception {
- final RemoteRpcProvider rpcProvider = new RemoteRpcProvider(system, mock(DOMRpcProviderService.class));
+ final RemoteRpcProvider rpcProvider = new RemoteRpcProvider(system, mock(DOMRpcProviderService.class),
+ new RemoteRpcProviderConfig(system.settings().config()));
final Broker.ProviderSession session = mock(Broker.ProviderSession.class);
final SchemaService schemaService = mock(SchemaService.class);
when(schemaService.getGlobalContext()). thenReturn(mock(SchemaContext.class));
import akka.japi.Pair;
import akka.testkit.JavaTestKit;
import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
@BeforeClass
public static void staticSetup() throws InterruptedException {
- RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
- RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
- RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build();
- node1 = ActorSystem.create("opendaylight-rpc", config1.get());
- node2 = ActorSystem.create("opendaylight-rpc", config2.get());
- node3 = ActorSystem.create("opendaylight-rpc", config3.get());
+ AkkaConfigurationReader reader = new AkkaConfigurationReader() {
+ @Override
+ public Config read() {
+ return ConfigFactory.load();
+ }
+ };
+
+ RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").withConfigReader(reader).build();
+ RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").withConfigReader(reader).build();
+ RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").withConfigReader(reader).build();
+ node1 = ActorSystem.create("opendaylight-rpc", config1.get());
+ node2 = ActorSystem.create("opendaylight-rpc", config2.get());
+ node3 = ActorSystem.create("opendaylight-rpc", config3.get());
}
@AfterClass
public static void staticTeardown() {
- JavaTestKit.shutdownActorSystem(node1);
- JavaTestKit.shutdownActorSystem(node2);
- JavaTestKit.shutdownActorSystem(node3);
+ JavaTestKit.shutdownActorSystem(node1);
+ JavaTestKit.shutdownActorSystem(node2);
+ JavaTestKit.shutdownActorSystem(node3);
}
@Before
public void setup() {
- registry1 = node1.actorOf(Props.create(RpcRegistry.class));
- registry2 = node2.actorOf(Props.create(RpcRegistry.class));
- registry3 = node3.actorOf(Props.create(RpcRegistry.class));
+ registry1 = node1.actorOf(Props.create(RpcRegistry.class, config(node1)));
+ registry2 = node2.actorOf(Props.create(RpcRegistry.class, config(node2)));
+ registry3 = node3.actorOf(Props.create(RpcRegistry.class, config(node3)));
+ }
+
+ private RemoteRpcProviderConfig config(ActorSystem node){
+ return new RemoteRpcProviderConfig(node.settings().config());
}
@After
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.TerminationMonitor;
public class BucketStoreTest {
* @return instance of BucketStore class
*/
private static BucketStore createStore(){
- final Props props = Props.create(BucketStore.class);
+ final Props props = Props.create(BucketStore.class, new RemoteRpcProviderConfig(system.settings().config()));
final TestActorRef<BucketStore> testRef = TestActorRef.create(system, props, "testStore");
return testRef.underlyingActor();
}
*/
package org.opendaylight.controller.remote.rpc.registry.gossip;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.testkit.TestActorRef;
import com.typesafe.config.ConfigFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.TerminationMonitor;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyMap;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-
public class GossiperTest {
*/
private static Gossiper createGossiper(){
- final Props props = Props.create(Gossiper.class, false);
+ final Props props = Props.create(Gossiper.class, false, new RemoteRpcProviderConfig(system.settings().config()));
final TestActorRef<Gossiper> testRef = TestActorRef.create(system, props, "testGossiper");
return testRef.underlyingActor();