<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-common</artifactId>
</dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-common-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-common-impl</artifactId>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-common-util</artifactId>
- </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-api</artifactId>
<repository>mvn:org.opendaylight.yangtools/features-yangtools/${yangtools.version}/xml/features</repository>
<feature name='odl-config-all' version='${project.version}' description="OpenDaylight :: Config :: All">
- <feature version='${mdsal.version}'>odl-mdsal-common</feature>
<feature version='${project.version}'>odl-config-api</feature>
<feature version='${project.version}'>odl-config-netty-config-api</feature>
<feature version='${project.version}'>odl-config-core</feature>
<feature version='${project.version}'>odl-config-manager</feature>
</feature>
- <feature name='odl-mdsal-common' version='${mdsal.version}' description="OpenDaylight :: Config :: All">
- <feature version='${yangtools.version}'>odl-yangtools-data-binding</feature>
- <bundle>mvn:org.opendaylight.controller/sal-common/${mdsal.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/sal-common-api/${mdsal.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/sal-common-impl/${mdsal.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/sal-common-util/${mdsal.version}</bundle>
- </feature>
-
<feature name='odl-config-api' version='${project.version}' description="OpenDaylight :: Config :: API">
<bundle>mvn:org.opendaylight.controller/config-api/${project.version}</bundle>
<feature version='${yangtools.version}'>odl-yangtools-common</feature>
<feature version='${yangtools.version}'>odl-yangtools-common</feature>
<feature version='${yangtools.version}'>odl-yangtools-binding</feature>
<feature version='${yangtools.version}'>odl-yangtools-binding-generator</feature>
- <feature version='${mdsal.version}'>odl-mdsal-common</feature>
<feature version='${project.version}'>odl-config-api</feature>
<bundle>mvn:org.opendaylight.controller/config-util/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller/yang-jmx-generator/${project.version}</bundle>
<feature version='${project.version}'>odl-config-core</feature>
<bundle>mvn:org.opendaylight.controller/config-manager/${project.version}</bundle>
</feature>
-</features>
\ No newline at end of file
+</features>
<artifactId>sal-akka-raft</artifactId>
<version>${mdsal.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-impl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-core-spi</artifactId>
<feature version='${project.version}'>odl-mdsal-xsql</feature>
<feature version='${project.version}'>odl-toaster</feature>
</feature>
+ <feature name='odl-mdsal-common' version='${mdsal.version}' description="OpenDaylight :: Config :: All">
+ <feature version='${yangtools.version}'>odl-yangtools-data-binding</feature>
+ <bundle>mvn:org.opendaylight.controller/sal-common/${mdsal.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/sal-common-api/${mdsal.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/sal-common-impl/${mdsal.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/sal-common-util/${mdsal.version}</bundle>
+ </feature>
<feature name='odl-mdsal-broker' version='${project.version}' description="OpenDaylight :: MDSAL :: Broker">
<feature version='${yangtools.version}'>odl-yangtools-common</feature>
<feature version='${yangtools.version}'>odl-yangtools-binding</feature>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.opendaylight.odlparent</groupId>
+ <artifactId>odlparent</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <relativePath/>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>karaf-parent</artifactId>
+ <name>${project.artifactId}</name>
+ <packaging>pom</packaging>
+ <prerequisites>
+ <maven>3.1.1</maven>
+ </prerequisites>
+ <properties>
+ <branding.version>1.1.0-SNAPSHOT</branding.version>
+ <karaf.resources.version>1.5.0-SNAPSHOT</karaf.resources.version>
+ </properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <!-- scope is compile so all features (there is only one) are installed
+ into startup.properties and the feature repo itself is not installed -->
+ <groupId>org.apache.karaf.features</groupId>
+ <artifactId>framework</artifactId>
+ <version>${karaf.version}</version>
+ <type>kar</type>
+ <exclusions>
+ <exclusion>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.sshd</groupId>
+ <artifactId>sshd-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ <dependencies>
+ <!-- ODL Branding -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>karaf.branding</artifactId>
+ <version>${branding.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <!-- Resources needed -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>opendaylight-karaf-resources</artifactId>
+ <version>${karaf.resources.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <versionRange>[0,)</versionRange>
+ <goals>
+ <goal>cleanVersions</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <versionRange>[0,)</versionRange>
+ <goals>
+ <goal>copy</goal>
+ <goal>unpack</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.karaf.tooling</groupId>
+ <artifactId>karaf-maven-plugin</artifactId>
+ <versionRange>[0,)</versionRange>
+ <goals>
+ <goal>commands-generate-help</goal>
+ <goal>features-add-to-repository</goal>
+ <goal>install-kars</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.fusesource.scalate</groupId>
+ <artifactId>maven-scalate-plugin</artifactId>
+ <versionRange>[0,)</versionRange>
+ <goals>
+ <goal>sitegen</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.servicemix.tooling</groupId>
+ <artifactId>depends-maven-plugin</artifactId>
+ <versionRange>[0,)</versionRange>
+ <goals>
+ <goal>generate-depends-file</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-resources</id>
+ <!-- here the phase you need -->
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${basedir}/target/assembly</outputDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/assembly</directory>
+ </resource>
+ </resources>
+ <overwrite>true</overwrite>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.karaf.tooling</groupId>
+ <artifactId>karaf-maven-plugin</artifactId>
+ <version>${karaf.version}</version>
+ <extensions>true</extensions>
+ <configuration>
+ <!-- no startupFeatures -->
+ <bootFeatures>
+ <feature>standard</feature>
+ <feature>${karaf.localFeature}</feature>
+ </bootFeatures>
+ <!-- no installedFeatures -->
+ </configuration>
+ <executions>
+ <execution>
+ <id>populate-system</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>features-add-to-repository</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+ <descriptor>mvn:org.apache.karaf.features/standard/${karaf.version}/xml/features</descriptor>
+ </descriptors>
+ <features>
+ <feature>standard</feature>
+ <feature>config</feature>
+ <feature>package</feature>
+ <feature>kar</feature>
+ <feature>ssh</feature>
+ <feature>management</feature>
+ <feature>war</feature>
+ </features>
+ <repository>target/assembly/system</repository>
+ </configuration>
+ </execution>
+ <execution>
+ <id>process-resources</id>
+ <goals>
+ <goal>install-kars</goal>
+ </goals>
+ <phase>process-resources</phase>
+ </execution>
+ <execution>
+ <id>package</id>
+ <goals>
+ <goal>instance-create-archive</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>${checkstyle.version}</version>
+ <configuration>
+ <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/configuration\/initial\/</excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy</id>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <!-- here the phase you need -->
+ <phase>generate-resources</phase>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>karaf.branding</artifactId>
+ <version>${karaf.branding.version}</version>
+ <outputDirectory>target/assembly/lib</outputDirectory>
+ <destFileName>karaf.branding-${branding.version}.jar</destFileName>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ <execution>
+ <id>unpack-karaf-resources</id>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ <phase>prepare-package</phase>
+ <configuration>
+ <outputDirectory>${project.build.directory}/assembly</outputDirectory>
+ <groupId>org.opendaylight.controller</groupId>
+ <includeArtifactIds>opendaylight-karaf-resources</includeArtifactIds>
+ <excludes>META-INF\/**</excludes>
+ <excludeTransitive>true</excludeTransitive>
+ <ignorePermissions>false</ignorePermissions>
+ </configuration>
+ </execution>
+ <execution>
+ <id>org.ops4j.pax.url.mvn.cfg</id>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <phase>prepare-package</phase>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>opendaylight-karaf-resources</artifactId>
+ <type>properties</type>
+ <classifier>config</classifier>
+ <overWrite>true</overWrite>
+ <outputDirectory>${project.build.directory}/assembly/etc/</outputDirectory>
+ <destFileName>org.ops4j.pax.url.mvn.cfg</destFileName>
+ </artifactItem>
+ </artifactItems>
+ <overWriteReleases>true</overWriteReleases>
+ <overWriteSnapshots>true</overWriteSnapshots>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <tasks>
+ <chmod perm="755">
+ <fileset dir="${project.build.directory}/assembly/bin">
+ <include name="karaf"/>
+ <include name="instance"/>
+ <include name="start"/>
+ <include name="stop"/>
+ <include name="status"/>
+ <include name="client"/>
+ <include name="shell"/>
+ </fileset>
+ </chmod>
+ </tasks>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.karaf.tooling</groupId>
+ <artifactId>karaf-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
<groupId>org.opendaylight.controller</groupId>
<artifactId>commons.opendaylight</artifactId>
<version>1.5.0-SNAPSHOT</version>
- <relativePath>../../commons/opendaylight</relativePath>
+ <relativePath>../../opendaylight/commons/opendaylight</relativePath>
</parent>
<artifactId>opendaylight-karaf-empty</artifactId>
<packaging>pom</packaging>
<groupId>org.opendaylight.controller</groupId>
<artifactId>commons.opendaylight</artifactId>
<version>1.5.0-SNAPSHOT</version>
- <relativePath>../../commons/opendaylight</relativePath>
+ <relativePath>../../opendaylight/commons/opendaylight</relativePath>
</parent>
<artifactId>opendaylight-karaf-resources</artifactId>
<description>Resources for opendaylight-karaf</description>
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-artifacts</id>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>src/main/assembly/etc/org.ops4j.pax.url.mvn.cfg</file>
+ <type>properties</type>
+ <classifier>config</classifier>
+ </artifact>
+ </artifacts>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
--- /dev/null
+################################################################################
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+################################################################################
+
+#
+# If set to true, the following property will not allow any certificate to be used
+# when accessing Maven repositories through SSL
+#
+#org.ops4j.pax.url.mvn.certificateCheck=
+
+#
+# Path to the local Maven settings file.
+# The repositories defined in this file will be automatically added to the list
+# of default repositories if the 'org.ops4j.pax.url.mvn.repositories' property
+# below is not set.
+# The following locations are checked for the existence of the settings.xml file
+# * 1. looks for the specified url
+# * 2. if not found looks for ${user.home}/.m2/settings.xml
+# * 3. if not found looks for ${maven.home}/conf/settings.xml
+# * 4. if not found looks for ${M2_HOME}/conf/settings.xml
+#
+#org.ops4j.pax.url.mvn.settings=
+
+#
+# Path to the local Maven repository which is used to avoid downloading
+# artifacts when they already exist locally.
+# The value of this property will be extracted from the settings.xml file
+# above, or defaulted to:
+# System.getProperty( "user.home" ) + "/.m2/repository"
+#
+org.ops4j.pax.url.mvn.localRepository=${karaf.home}/${karaf.default.repository}
+
+#
+# Default this to false. It's just weird to use undocumented repos
+#
+org.ops4j.pax.url.mvn.useFallbackRepositories=false
+
+#
+# Uncomment if you don't wanna use the proxy settings
+# from the Maven conf/settings.xml file
+#
+# org.ops4j.pax.url.mvn.proxySupport=false
+
+#
+# Disable aether support by default. This ensure that the defaultRepositories
+# below will be used
+#
+#org.ops4j.pax.url.mvn.disableAether=true
+
+#
+# Comma separated list of repositories scanned when resolving an artifact.
+# Those repositories will be checked before iterating through the
+# below list of repositories and even before the local repository
+# A repository url can be appended with zero or more of the following flags:
+# @snapshots : the repository contains snaphots
+# @noreleases : the repository does not contain any released artifacts
+#
+# The following property value will add the system folder as a repo.
+#
+#org.ops4j.pax.url.mvn.defaultRepositories=
+
+# Use the default local repo (e.g.~/.m2/repository) as a "remote" repo
+org.ops4j.pax.url.mvn.defaultLocalRepoAsRemote=false
+
+#
+# Comma separated list of repositories scanned when resolving an artifact.
+# The default list includes the following repositories containing releases:
+# http://repo1.maven.org/maven2
+# http://repository.apache.org/content/groups/snapshots-group
+# http://svn.apache.org/repos/asf/servicemix/m2-repo
+# http://repository.springsource.com/maven/bundles/release
+# http://repository.springsource.com/maven/bundles/external
+# To add repositories to the default ones, prepend '+' to the list of repositories
+# to add.
+# A repository url can be appended with zero or more of the following flags:
+# @snapshots : the repository contains snaphots
+# @noreleases : the repository does not contain any released artifacts
+# @id=reponid : the id for the repository, just like in the settings.xml this is optional but recomendet
+#
+# The default list doesn't contain any repository containing snapshots as it can impact the artifacts resolution.
+# You may want to add the following repositories containing snapshots:
+# http://repository.apache.org/content/groups/snapshots-group@id=apache@snapshots@noreleases
+# http://oss.sonatype.org/content/repositories/snapshots@id=sonatype.snapshots.deploy@snapshots@norelease
+# http://oss.sonatype.org/content/repositories/ops4j-snapshots@id=ops4j.sonatype.snapshots.deploy@snapshots@noreleases
+#
+org.ops4j.pax.url.mvn.repositories= \
+ file:${karaf.home}/${karaf.default.repository}@id=system.repository, \
+ file:${karaf.data}/kar@id=kar.repository@multi, \
+ http://repo1.maven.org/maven2@id=central, \
+ http://repository.springsource.com/maven/bundles/release@id=spring.ebr.release, \
+ http://repository.springsource.com/maven/bundles/external@id=spring.ebr.external
<groupId>org.opendaylight.controller</groupId>
<artifactId>commons.opendaylight</artifactId>
<version>1.5.0-SNAPSHOT</version>
- <relativePath>../../commons/opendaylight</relativePath>
+ <relativePath>../../opendaylight/commons/opendaylight</relativePath>
</parent>
<artifactId>distribution.opendaylight-karaf</artifactId>
<packaging>pom</packaging>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>commons.parent</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../opendaylight/commons/parent</relativePath>
+ </parent>
+
+ <artifactId>karaf-aggregator</artifactId>
+ <packaging>pom</packaging>
+ <modules>
+ <module>karaf-branding</module>
+ <module>karaf-parent</module>
+ <module>opendaylight-karaf</module>
+ <module>opendaylight-karaf-empty</module>
+ <module>opendaylight-karaf-resources</module>
+ </modules>
+</project>
-
/*
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
*
import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
import org.opendaylight.controller.sal.topology.ITopologyService;
+import org.opendaylight.controller.switchmanager.IInventoryListener;
import org.opendaylight.controller.switchmanager.ISwitchManager;
import org.opendaylight.controller.topologymanager.ITopologyManager;
import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
props.put("cachenames", propSet);
c.setInterface(new String[] { IListenTopoUpdates.class.getName(),
+ IInventoryListener.class.getName(),
ITopologyManager.class.getName(),
ITopologyManagerShell.class.getName(),
IConfigurationContainerAware.class.getName(),
import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.controller.switchmanager.IInventoryListener;
import org.opendaylight.controller.switchmanager.ISwitchManager;
import org.opendaylight.controller.topologymanager.ITopologyManager;
import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
IConfigurationContainerAware,
IListenTopoUpdates,
IObjectReader,
+ IInventoryListener,
CommandProvider {
protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
+ private static final long PENDING_UPDATE_TIMEOUT = 5000L;
+
private ITopologyService topoService;
private IClusterContainerServices clusterContainerService;
private IConfigurationContainerService configurationService;
private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
private volatile Boolean shuttingDown = false;
private Thread notifyThread;
+ private final Map<NodeConnector, List<PendingUpdateTask>> pendingUpdates =
+ new HashMap<NodeConnector, List<PendingUpdateTask>>();
+ private final BlockingQueue<TopoEdgeUpdate> updateQ =
+ new LinkedBlockingQueue<TopoEdgeUpdate>();
+ private Timer pendingTimer;
+ private Thread updateThread;
+
+ private class PendingEdgeUpdate extends TopoEdgeUpdate {
+ private PendingEdgeUpdate(Edge e, Set<Property> p, UpdateType t) {
+ super(e, p, t);
+ }
+ }
+
+ private class UpdateTopology implements Runnable {
+ @Override
+ public void run() {
+ log.trace("Start topology update thread");
+
+ while (!shuttingDown) {
+ try {
+ List<TopoEdgeUpdate> list = new ArrayList<TopoEdgeUpdate>();
+ TopoEdgeUpdate teu = updateQ.take();
+ for (; teu != null; teu = updateQ.poll()) {
+ list.add(teu);
+ }
+
+ if (!list.isEmpty()) {
+ log.trace("Update edges: {}", list);
+ doEdgeUpdate(list);
+ }
+ } catch (InterruptedException e) {
+ if (shuttingDown) {
+ break;
+ }
+ log.warn("Topology update thread interrupted", e);
+ } catch (Exception e) {
+ log.error("Exception on topology update thread", e);
+ }
+ }
+
+ log.trace("Exit topology update thread");
+ }
+ }
+
+ private class PendingUpdateTask extends TimerTask {
+ private final Edge edge;
+ private final Set<Property> props;
+ private final UpdateType type;
+
+ private PendingUpdateTask(Edge e, Set<Property> p, UpdateType t) {
+ edge = e;
+ props = p;
+ type = t;
+ }
+
+ private NodeConnector getHeadNodeConnector() {
+ return edge.getHeadNodeConnector();
+ }
+
+ private void flush() {
+ log.info("Flush pending topology update: edge {}, type {}",
+ edge, type);
+ updateQ.add(new PendingEdgeUpdate(edge, props, type));
+ }
+ @Override
+ public void run() {
+ if (removePendingEvent(this)) {
+ log.warn("Pending topology update timed out: edge{}, type {}",
+ edge, type);
+ }
+ }
+ }
void nonClusterObjectCreate() {
edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
// Restore the shuttingDown status on init of the component
shuttingDown = false;
notifyThread = new Thread(new TopologyNotify(notifyQ));
+ pendingTimer = new Timer("Topology Pending Update Timer");
+ updateThread = new Thread(new UpdateTopology(), "Topology Update");
}
@SuppressWarnings({ "unchecked" })
*
*/
void started() {
+ updateThread.start();
+
// Start the batcher thread for the cluster wide topology updates
notifyThread.start();
// SollicitRefresh MUST be called here else if called at init
void stop() {
shuttingDown = true;
+ updateThread.interrupt();
notifyThread.interrupt();
+ pendingTimer.cancel();
}
/**
*
*/
void destroy() {
+ updateQ.clear();
+ updateThread = null;
+ pendingTimer = null;
notifyQ.clear();
notifyThread = null;
}
return (switchManager.doesNodeConnectorExist(head));
}
+ private void addPendingEvent(Edge e, Set<Property> p, UpdateType t) {
+ NodeConnector head = e.getHeadNodeConnector();
+ PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+ synchronized (pendingUpdates) {
+ List<PendingUpdateTask> list = pendingUpdates.get(head);
+ if (list == null) {
+ list = new LinkedList<PendingUpdateTask>();
+ pendingUpdates.put(head, list);
+ }
+ list.add(task);
+ pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+ }
+ }
+
+ private boolean enqueueEventIfPending(Edge e, Set<Property> p, UpdateType t) {
+ NodeConnector head = e.getHeadNodeConnector();
+ synchronized (pendingUpdates) {
+ List<PendingUpdateTask> list = pendingUpdates.get(head);
+ if (list != null) {
+ log.warn("Enqueue edge update: edge {}, type {}", e, t);
+ PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+ list.add(task);
+ pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private boolean removePendingEvent(PendingUpdateTask t) {
+ t.cancel();
+ NodeConnector head = t.getHeadNodeConnector();
+ boolean removed = false;
+
+ synchronized (pendingUpdates) {
+ List<PendingUpdateTask> list = pendingUpdates.get(head);
+ if (list != null) {
+ removed = list.remove(t);
+ if (list.isEmpty()) {
+ pendingUpdates.remove(head);
+ }
+ }
+ }
+
+ return removed;
+ }
+
+ private void removePendingEvent(NodeConnector head, boolean doFlush) {
+ List<PendingUpdateTask> list;
+ synchronized (pendingUpdates) {
+ list = pendingUpdates.remove(head);
+ }
+
+ if (list != null) {
+ for (PendingUpdateTask task : list) {
+ if (task.cancel() && doFlush) {
+ task.flush();
+ }
+ }
+ pendingTimer.purge();
+ }
+ }
+
private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
- switch (type) {
- case ADDED:
+ return edgeUpdate(e, type, props, false);
+ }
+ private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props, boolean isPending) {
+ if (!type.equals(UpdateType.ADDED) &&
+ enqueueEventIfPending(e, props, type)) {
+ return null;
+ }
+ switch (type) {
+ case ADDED:
if (this.edgesDB.containsKey(e)) {
// Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks
log.trace("Skipping redundant edge addition: {}", e);
return null;
}
+ // Ensure that head node connector exists
+ if (!isPending) {
+ if (headNodeConnectorExist(e)) {
+ removePendingEvent(e.getHeadNodeConnector(), true);
+ } else {
+ log.warn("Ignore edge that contains invalid node connector: {}",
+ e);
+ addPendingEvent(e, props, type);
+ return null;
+ }
+ }
+
// Make sure the props are non-null or create a copy
if (props == null) {
props = new HashSet<Property>();
props = new HashSet<Property>(props);
}
-
- // Ensure that head node connector exists
- if (!headNodeConnectorExist(e)) {
- log.warn("Ignore edge that contains invalid node connector: {}", e);
- return null;
- }
-
// Check if nodeConnectors of the edge were correctly categorized
// by protocol plugin
crossCheckNodeConnectors(e);
return new TopoEdgeUpdate(e, props, type);
}
- @Override
- public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+ private void doEdgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
- for (int i = 0; i < topoedgeupdateList.size(); i++) {
- Edge e = topoedgeupdateList.get(i).getEdge();
- Set<Property> p = topoedgeupdateList.get(i).getProperty();
- UpdateType type = topoedgeupdateList.get(i).getUpdateType();
- TopoEdgeUpdate teu = edgeUpdate(e, type, p);
- if (teu != null) {
- teuList.add(teu);
+ for (TopoEdgeUpdate teu : topoedgeupdateList) {
+ boolean isPending = (teu instanceof PendingEdgeUpdate);
+ Edge e = teu.getEdge();
+ Set<Property> p = teu.getProperty();
+ UpdateType type = teu.getUpdateType();
+ TopoEdgeUpdate update = edgeUpdate(e, type, p, isPending);
+ if (update != null) {
+ teuList.add(update);
}
}
}
}
+ @Override
+ public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+ updateQ.addAll(topoedgeupdateList);
+ }
+
private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
notifyQ.add(upd);
}
+ @Override
+ public void notifyNode(Node node, UpdateType type, Map<String, Property> propMap) {
+ // NOP
+ }
+
+ @Override
+ public void notifyNodeConnector(NodeConnector nc, UpdateType type, Map<String, Property> propMap) {
+ // Remove pending edge updates for the given node connector.
+ // Pending events should be notified if the node connector exists.
+ boolean doFlush = !type.equals(UpdateType.REMOVED);
+ removePendingEvent(nc, doFlush);
+ }
+
@Override
public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
if (cacheName.equals(TOPOEDGESDB)) {
return result;
}
+ // Only for unit test.
+ void startTest() {
+ pendingTimer = new Timer("Topology Pending Update Timer");
+ updateThread = new Thread(new UpdateTopology(), "Topology Update");
+ updateThread.start();
+ }
+
+ void stopTest() {
+ shuttingDown = true;
+ updateThread.interrupt();
+ pendingTimer.cancel();
+ }
+
+ boolean flushUpdateQueue(long timeout) {
+ long limit = System.currentTimeMillis() + timeout;
+ long cur;
+ do {
+ if (updateQ.peek() == null) {
+ return true;
+ }
+
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ break;
+ }
+ cur = System.currentTimeMillis();
+ } while (cur < limit);
+
+ return false;
+ }
}
package org.opendaylight.controller.topologymanager.internal;
import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.sal.core.Bandwidth;
import org.opendaylight.controller.sal.core.ConstructionException;
import java.util.concurrent.ConcurrentMap;
public class TopologyManagerImplTest {
+ private TopologyManagerImpl topoManagerImpl;
+
/**
* Mockup of switch manager that only maintains existence of node
* connector.
}
}
+ private void clear() {
+ nodeSet.clear();
+ nodeConnectorSet.clear();
+ }
+
@Override
public Status addSubnet(SubnetConfig configObject) {
return null;
}
}
+ @Before
+ public void setUp() {
+ topoManagerImpl = new TopologyManagerImpl();
+ topoManagerImpl.startTest();
+ }
+
+ @After
+ public void tearDown() {
+ if (topoManagerImpl != null) {
+ topoManagerImpl.stopTest();
+ topoManagerImpl = null;
+ }
+ }
+
/*
* Sets the node, edges and properties for edges here: Edge <SwitchId :
* NodeConnectorId> : <1:1>--><11:11>; <1:2>--><11:12>; <3:3>--><13:13>;
topoedgeupdateList.add(teu2);
topoManagerImpl.edgeUpdate(topoedgeupdateList);
}
+
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
}
@Test
public void testGetNodeEdges() throws ConstructionException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
setNodeEdges(topoManagerImpl, swMgr);
@Test
public void testGetEdges() throws ConstructionException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
setNodeEdges(topoManagerImpl, swMgr);
TopologyUserLinkConfig link4 = new TopologyUserLinkConfig("default20",
"OF|10@OF|20", "OF|10@OF|30");
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
public void testGetUserLink() {
TopologyUserLinkConfig[] link = new TopologyUserLinkConfig[5];
TopologyUserLinkConfig[] reverseLink = new TopologyUserLinkConfig[5];
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
@Test
public void testHostLinkMethods() throws ConstructionException,
UnknownHostException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
@Test
public void testGetNodesWithNodeConnectorHost()
throws ConstructionException, UnknownHostException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
@Test
public void bug1348FixTest() throws ConstructionException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
Assert.fail("Exception was raised when trying to update edge properties: " + e.getMessage());
}
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
Assert.assertEquals(1, topoManagerImpl.getEdges().size());
Assert.assertNotNull(topoManagerImpl.getEdges().get(edge));
}
+
+ @Test
+ public void testNotifyNodeConnector() throws ConstructionException {
+ TestSwitchManager swMgr = new TestSwitchManager();
+ topoManagerImpl.setSwitchManager(swMgr);
+ topoManagerImpl.nonClusterObjectCreate();
+
+ // Test NodeConnector notification in the case that there are no
+ // related edge updates.
+ NodeConnector nc1 = NodeConnectorCreator.createOFNodeConnector(
+ (short) 1, NodeCreator.createOFNode(1000L));
+ Map<String, Property> propMap = new HashMap<>();
+ swMgr.addNodeConnectors(nc1);
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.CHANGED, propMap);
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+ swMgr.clear();
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+ // Test NodeConnector notification in the case that there is a related
+ // edge update just before the notification.
+ NodeConnector nc2 = NodeConnectorCreator.createOFNodeConnector(
+ (short) 2, NodeCreator.createOFNode(2000L));
+ Edge edge1 = new Edge(nc1, nc2);
+ Edge edge2 = new Edge(nc2, nc1);
+ Set<Property> props = new HashSet<Property>();
+ TopoEdgeUpdate teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED);
+ TopoEdgeUpdate teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED);
+ List<TopoEdgeUpdate> topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+ topoedgeupdateList.add(teu1);
+ topoedgeupdateList.add(teu2);
+ topoManagerImpl.edgeUpdate(topoedgeupdateList);
+ swMgr.addNodeConnectors(nc1);
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+ swMgr.addNodeConnectors(nc2);
+ topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap);
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+ Assert.assertEquals(2, topoManagerImpl.getEdges().size());
+
+ teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED);
+ teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED);
+ topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+ topoedgeupdateList.add(teu1);
+ topoedgeupdateList.add(teu2);
+ topoManagerImpl.edgeUpdate(topoedgeupdateList);
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+ topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap);
+
+ swMgr.clear();
+
+ // Test NodeConnector notification in the case that there are multiple
+ // edge updates related to the NodeConnector just before the notification.
+ teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED);
+ teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED);
+ TopoEdgeUpdate teu3 = new TopoEdgeUpdate(edge1, props, UpdateType.CHANGED);
+ TopoEdgeUpdate teu4 = new TopoEdgeUpdate(edge2, props, UpdateType.CHANGED);
+ TopoEdgeUpdate teu5 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED);
+ TopoEdgeUpdate teu6 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED);
+ topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+ topoedgeupdateList.add(teu1);
+ topoedgeupdateList.add(teu2);
+ topoedgeupdateList.add(teu3);
+ topoedgeupdateList.add(teu4);
+ topoedgeupdateList.add(teu5);
+ topoedgeupdateList.add(teu6);
+ topoManagerImpl.edgeUpdate(topoedgeupdateList);
+ swMgr.addNodeConnectors(nc1);
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+ swMgr.addNodeConnectors(nc2);
+ topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap);
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+ topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap);
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+ }
}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>binding-parent</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <relativePath/>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>config-parent</artifactId>
+ <version>0.3.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+
+ <properties>
+ <config.version>0.3.0-SNAPSHOT</config.version>
+ <mdsal.version>1.2.0-SNAPSHOT</mdsal.version>
+ <jmxGeneratorPath>src/main/yang-gen-config</jmxGeneratorPath>
+ <config.file>src/main/config/default-config.xml</config.file>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <!-- project specific dependencies -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>config-artifacts</artifactId>
+ <version>${config.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>mdsal-artifacts</artifactId>
+ <version>${mdsal.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>config-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-config</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-maven-plugin</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>yang-jmx-generator-plugin</artifactId>
+ <version>${config.version}</version>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <id>config</id>
+ <goals>
+ <goal>generate-sources</goal>
+ </goals>
+ <configuration>
+ <codeGenerators>
+ <generator>
+ <codeGeneratorClass>org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator</codeGeneratorClass>
+ <outputBaseDir>${jmxGeneratorPath}</outputBaseDir>
+ <additionalConfiguration>
+ <namespaceToPackage1>urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang</namespaceToPackage1>
+ </additionalConfiguration>
+ </generator>
+ </codeGenerators>
+ <inspectDependencies>true</inspectDependencies>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>${jmxGeneratorPath}</directory>
+ <includes>
+ <include>**</include>
+ </includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+ <profiles>
+ <profile>
+ <activation>
+ <file>
+ <exists>${config.file}</exists>
+ </file>
+ </activation>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-artifacts</id>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>${config.file}</file>
+ <type>xml</type>
+ <classifier>config</classifier>
+ </artifact>
+ </artifacts>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
<module>config-netty-config</module>
<module>config-artifacts</module>
+ <module>config-parent</module>
</modules>
<dependencies>
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
-import akka.japi.Creator;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
this.target = target;
}
- public static Props props(final ActorRef target){
- return Props.create(new Creator<ClientActor>(){
- private static final long serialVersionUID = 1L;
-
- @Override public ClientActor create() throws Exception {
- return new ClientActor(target);
- }
- });
+ public static Props props(final ActorRef target) {
+ return Props.create(ClientActor.class, target);
}
@Override public void onReceive(Object message) throws Exception {
import akka.actor.ActorRef;
import akka.actor.Props;
-import akka.japi.Creator;
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
}
public static Props props(final String id, final Map<String, String> peerAddresses,
- final Optional<ConfigParams> configParams){
- return Props.create(new Creator<ExampleActor>(){
-
- @Override public ExampleActor create() throws Exception {
- return new ExampleActor(id, peerAddresses, configParams);
- }
- });
+ final Optional<ConfigParams> configParams) {
+ return Props.create(ExampleActor.class, id, peerAddresses, configParams);
}
@Override public void onReceiveCommand(Object message) throws Exception{
package org.opendaylight.controller.cluster.example;
-import akka.actor.Actor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.Props;
-import akka.japi.Creator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
}
public static Props getProps(final String memberName) {
- return Props.create(new Creator<Actor>() {
- @Override
- public Actor create() throws Exception {
- return new ExampleRoleChangeListener(memberName);
- }
- });
+ return Props.create(ExampleRoleChangeListener.class, memberName);
}
@Override
package org.opendaylight.controller.cluster.notifications;
-import akka.actor.Actor;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.Props;
-import akka.japi.Creator;
import akka.serialization.Serialization;
import com.google.common.collect.Maps;
import java.util.Map;
}
public static Props getProps(final String memberId) {
- return Props.create(new Creator<Actor>() {
- @Override
- public Actor create() throws Exception {
- return new RoleChangeNotifier(memberId);
- }
- });
+ return Props.create(RoleChangeNotifier.class, memberId);
}
@Override
import akka.actor.DeadLetter;
import akka.actor.Props;
import akka.actor.UntypedActor;
-import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import org.junit.After;
import org.junit.Before;
}
public static Props props(final ReentrantLock lock){
- return Props.create(new Creator<PingPongActor>(){
- private static final long serialVersionUID = 1L;
- @Override
- public PingPongActor create() throws Exception {
- return new PingPongActor(lock);
- }
- });
+ return Props.create(PingPongActor.class, lock);
}
@Override
LOG.debug("Sending change notification {} to listener {}", change, listener);
- this.listener.onDataChanged(change);
+ try {
+ this.listener.onDataChanged(change);
+ } catch (RuntimeException e) {
+ LOG.error( String.format( "Error notifying listener %s", this.listener ), e );
+ }
// It seems the sender is never null but it doesn't hurt to check. If the caller passes in
// a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor.
import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public class DataChangeListenerTest extends AbstractActorTest {
}
}};
}
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testDataChangedWithListenerRuntimeEx(){
+ new JavaTestKit(getSystem()) {{
+ AsyncDataChangeEvent mockChangeEvent1 = Mockito.mock(AsyncDataChangeEvent.class);
+ AsyncDataChangeEvent mockChangeEvent2 = Mockito.mock(AsyncDataChangeEvent.class);
+ AsyncDataChangeEvent mockChangeEvent3 = Mockito.mock(AsyncDataChangeEvent.class);
+
+ AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
+ Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataChanged(mockChangeEvent2);
+
+ Props props = DataChangeListener.props(mockListener);
+ ActorRef subject = getSystem().actorOf(props, "testDataChangedWithListenerRuntimeEx");
+
+ // Let the DataChangeListener know that notifications should be enabled
+ subject.tell(new EnableNotification(true), getRef());
+
+ SchemaContext schemaContext = CompositeModel.createTestContext();
+
+ subject.tell(new DataChanged(schemaContext, mockChangeEvent1),getRef());
+ expectMsgClass(DataChangedReply.class);
+
+ subject.tell(new DataChanged(schemaContext, mockChangeEvent2),getRef());
+ expectMsgClass(DataChangedReply.class);
+
+ subject.tell(new DataChanged(schemaContext, mockChangeEvent3),getRef());
+ expectMsgClass(DataChangedReply.class);
+
+ Mockito.verify(mockListener).onDataChanged(mockChangeEvent1);
+ Mockito.verify(mockListener).onDataChanged(mockChangeEvent2);
+ Mockito.verify(mockListener).onDataChanged(mockChangeEvent3);
+ }};
+ }
}
config.getRpcManagerName());
LOG.debug("rpc manager started");
+
+ schemaService.registerSchemaContextListener(this);
}
@Override
import akka.dispatch.OnComplete;
import akka.japi.Creator;
import akka.japi.Pair;
+
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic;
import org.opendaylight.controller.remote.rpc.utils.RoutingLogic;
private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
private final Broker.ProviderSession brokerSession;
private final ActorRef rpcRegistry;
- private final SchemaContext schemaContext;
+ private SchemaContext schemaContext;
private final RemoteRpcProviderConfig config;
private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry,
invokeRemoteRpc((InvokeRpc) message);
} else if(message instanceof ExecuteRpc) {
executeRpc((ExecuteRpc) message);
+ } else if(message instanceof UpdateSchemaContext) {
+ updateSchemaContext((UpdateSchemaContext) message);
}
}
+ private void updateSchemaContext(UpdateSchemaContext message) {
+ this.schemaContext = message.getSchemaContext();
+ }
+
private void invokeRemoteRpc(final InvokeRpc msg) {
if(LOG.isDebugEnabled()) {
LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc());
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
-import akka.japi.Creator;
import akka.japi.Function;
+import java.util.Set;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
-import java.util.Set;
-
/**
* This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
*
}
- public static Props props(final SchemaContext schemaContext,
- final Broker.ProviderSession brokerSession,
- final RpcProvisionRegistry rpcProvisionRegistry) {
- return Props.create(new Creator<RpcManager>() {
- private static final long serialVersionUID = 1L;
- @Override
- public RpcManager create() throws Exception {
- return new RpcManager(schemaContext, brokerSession, rpcProvisionRegistry);
- }
- });
- }
+ public static Props props(final SchemaContext schemaContext, final Broker.ProviderSession brokerSession,
+ final RpcProvisionRegistry rpcProvisionRegistry) {
+ return Props.create(RpcManager.class, schemaContext, brokerSession, rpcProvisionRegistry);
+ }
private void createRpcActors() {
LOG.debug("Create rpc registry and broker actors");
private void updateSchemaContext(UpdateSchemaContext message) {
this.schemaContext = message.getSchemaContext();
+ rpcBroker.tell(message, ActorRef.noSender());
}
@Override
import akka.actor.ActorRef;
import akka.japi.Option;
import akka.japi.Pair;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
public class RoutingTable implements Copier<RoutingTable>, Serializable {
private static final long serialVersionUID = 1L;
- private Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
+ private final Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
private ActorRef router;
@Override
public RoutingTable copy() {
RoutingTable copy = new RoutingTable();
- copy.setTable(new HashMap<>(table));
+ copy.table.putAll(table);
copy.setRouter(this.getRouter());
return copy;
public Option<Pair<ActorRef, Long>> getRouterFor(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
Long updatedTime = table.get(routeId);
- if (updatedTime == null || router == null)
+ if (updatedTime == null || router == null) {
return Option.none();
- else
+ } else {
return Option.option(new Pair<>(router, updatedTime));
+ }
}
public void addRoute(RpcRouter.RouteIdentifier<?,?,?> routeId){
table.remove(routeId);
}
- public Boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+ public boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
return table.containsKey(routeId);
}
- public Boolean isEmpty(){
+ public boolean isEmpty(){
return table.isEmpty();
}
- ///
- /// Getter, Setters
- ///
- //TODO: Remove public
- public Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> getTable() {
- return table;
- }
- void setTable(Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table) {
- this.table = table;
+ public int size() {
+ return table.size();
}
public ActorRef getRouter() {
package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
-import akka.actor.Address;
-import akka.actor.Props;
-import akka.dispatch.Mapper;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Option;
import akka.japi.Pair;
-import akka.pattern.Patterns;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import scala.concurrent.Future;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
/**
* Registry to look up cluster nodes that have registered for a given rpc.
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
*/
-public class RpcRegistry extends AbstractUntypedActorWithMetering {
+public class RpcRegistry extends BucketStore<RoutingTable> {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
- /**
- * Store to keep the registry. Bucket store sync's it across nodes in the cluster
- */
- private ActorRef bucketStore;
-
- /**
- * Rpc broker that would use the registry to route requests.
- */
- private ActorRef localRouter;
-
- private RemoteRpcProviderConfig config;
-
public RpcRegistry() {
- bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
- this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
- log.info("Bucket store path = {}", bucketStore.path().toString());
+ getLocalBucket().setData(new RoutingTable());
}
- public RpcRegistry(ActorRef bucketStore) {
- this.bucketStore = bucketStore;
- }
-
-
@Override
protected void handleReceive(Object message) throws Exception {
//TODO: if sender is remote, reject message
- if (message instanceof SetLocalRouter)
+ if (message instanceof SetLocalRouter) {
receiveSetLocalRouter((SetLocalRouter) message);
-
- if (message instanceof AddOrUpdateRoutes)
+ } else if (message instanceof AddOrUpdateRoutes) {
receiveAddRoutes((AddOrUpdateRoutes) message);
-
- else if (message instanceof RemoveRoutes)
+ } else if (message instanceof RemoveRoutes) {
receiveRemoveRoutes((RemoveRoutes) message);
-
- else if (message instanceof Messages.FindRouters)
+ } else if (message instanceof Messages.FindRouters) {
receiveGetRouter((FindRouters) message);
-
- else
- unhandled(message);
+ } else {
+ super.handleReceive(message);
+ }
}
/**
* @param message contains {@link akka.actor.ActorRef} for rpc broker
*/
private void receiveSetLocalRouter(SetLocalRouter message) {
- localRouter = message.getRouter();
+ getLocalBucket().getData().setRouter(message.getRouter());
}
/**
*/
private void receiveAddRoutes(AddOrUpdateRoutes msg) {
- Preconditions.checkState(localRouter != null, "Router must be set first");
+ log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+
+ RoutingTable table = getLocalBucket().getData().copy();
+ for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+ table.addRoute(routeId);
+ }
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
- futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+ updateLocalBucket(table);
}
/**
*/
private void receiveRemoveRoutes(RemoveRoutes msg) {
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
- futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+ RoutingTable table = getLocalBucket().getData().copy();
+ for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+ table.removeRoute(routeId);
+ }
+ updateLocalBucket(table);
}
/**
* @param msg
*/
private void receiveGetRouter(FindRouters msg) {
- final ActorRef sender = getSender();
-
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration());
- futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
- }
-
- /**
- * Helper to create empty reply when no routers are found
- *
- * @return
- */
- private Messages.FindRoutersReply createEmptyReply() {
- List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
- return new Messages.FindRoutersReply(routerWithUpdateTime);
- }
-
- /**
- * Helper to create a reply when routers are found for the given rpc
- *
- * @param buckets
- * @param routeId
- * @return
- */
- private Messages.FindRoutersReply createReplyWithRouters(
- Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
-
List<Pair<ActorRef, Long>> routers = new ArrayList<>();
- Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
-
- for (Bucket bucket : buckets.values()) {
-
- RoutingTable table = (RoutingTable) bucket.getData();
- if (table == null)
- continue;
- routerWithUpdateTime = table.getRouterFor(routeId);
- if (routerWithUpdateTime.isEmpty())
- continue;
+ RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
+ findRoutes(getLocalBucket().getData(), routeId, routers);
- routers.add(routerWithUpdateTime.get());
+ for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
+ findRoutes(bucket.getData(), routeId, routers);
}
- return new Messages.FindRoutersReply(routers);
- }
-
-
- ///
- ///private factories to create Mapper
- ///
-
- /**
- * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
- *
- * @param routeId the rpc
- * @param sender client who asked to find the routers.
- * @return
- */
- private Mapper<Object, Void> getMapperToGetRouter(
- final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
-
- if (replyMessage instanceof GetAllBucketsReply) {
-
- GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
- Map<Address, Bucket> buckets = reply.getBuckets();
-
- if (buckets == null || buckets.isEmpty()) {
- sender.tell(createEmptyReply(), getSelf());
- return null;
- }
-
- sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
- }
- return null;
- }
- };
- }
-
- /**
- * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
- * it updates the local bucket in bucket store.
- *
- * @param routeIds rpc to remote
- * @return
- */
- private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetLocalBucketReply) {
-
- GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
- Bucket<RoutingTable> bucket = reply.getBucket();
-
- if (bucket == null) {
- log.debug("Local bucket is null");
- return null;
- }
-
- RoutingTable table = bucket.getData();
- if (table == null)
- table = new RoutingTable();
-
- table.setRouter(localRouter);
-
- if (!table.isEmpty()) {
- for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- table.removeRoute(routeId);
- }
- }
- bucket.setData(table);
-
- UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
- bucketStore.tell(updateBucketMessage, getSelf());
- }
- return null;
- }
- };
+ getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
}
- /**
- * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
- * it updates the local bucket in bucket store.
- *
- * @param routeIds rpc to add
- * @return
- */
- private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetLocalBucketReply) {
-
- GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
- Bucket<RoutingTable> bucket = reply.getBucket();
-
- if (bucket == null) {
- log.debug("Local bucket is null");
- return null;
- }
-
- RoutingTable table = bucket.getData();
- if (table == null)
- table = new RoutingTable();
-
- table.setRouter(localRouter);
- for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
- table.addRoute(routeId);
- }
-
- bucket.setData(table);
-
- UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
- bucketStore.tell(updateBucketMessage, getSelf());
- }
+ private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
+ List<Pair<ActorRef, Long>> routers) {
+ if (table == null) {
+ return;
+ }
- return null;
- }
- };
+ Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
+ if(!routerWithUpdateTime.isEmpty()) {
+ routers.add(routerWithUpdateTime.get());
+ }
}
/**
public interface Bucket<T extends Copier<T>> {
public Long getVersion();
public T getData();
- public void setData(T data);
}
private T data;
+ public BucketImpl() {
+ }
+
+ public BucketImpl(T data) {
+ this.data = data;
+ }
+
+ public BucketImpl(Bucket<T> other) {
+ this.version = other.getVersion();
+ this.data = other.getData();
+ }
+
+ public void setData(T data) {
+ this.data = data;
+ this.version = System.currentTimeMillis()+1;
+ }
+
@Override
public Long getVersion() {
return version;
@Override
public T getData() {
- if (this.data == null)
- return null;
-
- return data.copy();
- }
-
- public void setData(T data){
- this.version = System.currentTimeMillis()+1;
- this.data = data;
+ return data;
}
@Override
import akka.cluster.ClusterActorRefProvider;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
import org.opendaylight.controller.utils.ConditionalProbe;
* This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
*
*/
-public class BucketStore extends AbstractUntypedActorWithMetering {
+public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
+
+ private static final Long NO_VERSION = -1L;
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
/**
* Bucket owned by the node
*/
- private BucketImpl localBucket = new BucketImpl();
+ private final BucketImpl<T> localBucket = new BucketImpl<>();
/**
* Buckets ownded by other known nodes in the cluster
*/
- private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
+ private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
/**
* Bucket version for every known node in the cluster including this node
*/
- private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
+ private final Map<Address, Long> versions = new HashMap<>();
/**
* Cluster address for this node
}
}
-
@Override
protected void handleReceive(Object message) throws Exception {
if (probe != null) {
probe = (ConditionalProbe) message;
// Send back any message to tell the caller we got the probe.
getSender().tell("Got it", getSelf());
- } else if (message instanceof UpdateBucket) {
- receiveUpdateBucket(((UpdateBucket) message).getBucket());
} else if (message instanceof GetAllBuckets) {
- receiveGetAllBucket();
- } else if (message instanceof GetLocalBucket) {
- receiveGetLocalBucket();
+ receiveGetAllBuckets();
} else if (message instanceof GetBucketsByMembers) {
- receiveGetBucketsByMembers(
- ((GetBucketsByMembers) message).getMembers());
+ receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
} else if (message instanceof GetBucketVersions) {
receiveGetBucketVersions();
} else if (message instanceof UpdateRemoteBuckets) {
- receiveUpdateRemoteBuckets(
- ((UpdateRemoteBuckets) message).getBuckets());
+ receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
} else {
if(log.isDebugEnabled()) {
log.debug("Unhandled message [{}]", message);
}
}
- /**
- * Returns a copy of bucket owned by this node
- */
- private void receiveGetLocalBucket() {
- final ActorRef sender = getSender();
- GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
- sender.tell(reply, getSelf());
- }
-
- /**
- * Updates the bucket owned by this node
- *
- * @param updatedBucket
- */
- void receiveUpdateBucket(Bucket updatedBucket){
-
- localBucket = (BucketImpl) updatedBucket;
- versions.put(selfAddress, localBucket.getVersion());
- }
-
/**
* Returns all the buckets the this node knows about, self owned + remote
*/
- void receiveGetAllBucket(){
+ void receiveGetAllBuckets(){
final ActorRef sender = getSender();
sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
}
*
* @return self owned + remote buckets
*/
+ @SuppressWarnings("rawtypes")
Map<Address, Bucket> getAllBuckets(){
Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
//first add the local bucket
- all.put(selfAddress, localBucket);
+ all.put(selfAddress, new BucketImpl<>(localBucket));
//then get all remote buckets
all.putAll(remoteBuckets);
*
* @param members requested members
*/
+ @SuppressWarnings("rawtypes")
void receiveGetBucketsByMembers(Set<Address> members){
final ActorRef sender = getSender();
Map<Address, Bucket> buckets = getBucketsByMembers(members);
* @param members requested members
* @return buckets for requested memebers
*/
+ @SuppressWarnings("rawtypes")
Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
Map<Address, Bucket> buckets = new HashMap<>();
//first add the local bucket if asked
if (members.contains(selfAddress)) {
- buckets.put(selfAddress, localBucket);
+ buckets.put(selfAddress, new BucketImpl<>(localBucket));
}
//then get buckets for requested remote nodes
* @param receivedBuckets buckets sent by remote
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
*/
+ @SuppressWarnings({ "rawtypes", "unchecked" })
void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
-
+ log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
if (receivedBuckets == null || receivedBuckets.isEmpty())
{
return; //nothing to do
Long localVersion = versions.get(entry.getKey());
if (localVersion == null) {
- localVersion = -1L;
+ localVersion = NO_VERSION;
}
- Bucket receivedBucket = entry.getValue();
+ Bucket<T> receivedBucket = entry.getValue();
if (receivedBucket == null) {
continue;
Long remoteVersion = receivedBucket.getVersion();
if (remoteVersion == null) {
- remoteVersion = -1L;
+ remoteVersion = NO_VERSION;
}
//update only if remote version is newer
versions.put(entry.getKey(), remoteVersion);
}
}
+
if(log.isDebugEnabled()) {
log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
}
}
- ///
- ///Getter Setters
- ///
-
- BucketImpl getLocalBucket() {
+ protected BucketImpl<T> getLocalBucket() {
return localBucket;
}
- void setLocalBucket(BucketImpl localBucket) {
- this.localBucket = localBucket;
+ protected void updateLocalBucket(T data) {
+ localBucket.setData(data);
+ versions.put(selfAddress, localBucket.getVersion());
}
- ConcurrentMap<Address, Bucket> getRemoteBuckets() {
+ protected Map<Address, Bucket<T>> getRemoteBuckets() {
return remoteBuckets;
}
- void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
- this.remoteBuckets = remoteBuckets;
- }
-
- ConcurrentMap<Address, Long> getVersions() {
+ @VisibleForTesting
+ Map<Address, Long> getVersions() {
return versions;
}
-
- void setVersions(ConcurrentMap<Address, Long> versions) {
- this.versions = versions;
- }
-
- Address getSelfAddress() {
- return selfAddress;
- }
}
import akka.actor.Address;
import com.google.common.base.Preconditions;
-
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets;
/**
public static class BucketStoreMessages{
- public static class GetLocalBucket implements Serializable {
- private static final long serialVersionUID = 1L;
- }
-
- public static class ContainsBucket implements Serializable {
- private static final long serialVersionUID = 1L;
- final private Bucket bucket;
-
- public ContainsBucket(Bucket bucket){
- Preconditions.checkArgument(bucket != null, "bucket can not be null");
- this.bucket = bucket;
- }
-
- public Bucket getBucket(){
- return bucket;
- }
-
- }
-
- public static class UpdateBucket extends ContainsBucket implements Serializable {
- private static final long serialVersionUID = 1L;
- public UpdateBucket(Bucket bucket){
- super(bucket);
- }
- }
-
- public static class GetLocalBucketReply extends ContainsBucket implements Serializable {
- private static final long serialVersionUID = 1L;
- public GetLocalBucketReply(Bucket bucket){
- super(bucket);
- }
- }
-
public static class GetAllBuckets implements Serializable {
private static final long serialVersionUID = 1L;
}
public static class GetBucketsByMembers implements Serializable{
private static final long serialVersionUID = 1L;
- private Set<Address> members;
+ private final Set<Address> members;
public GetBucketsByMembers(Set<Address> members){
Preconditions.checkArgument(members != null, "members can not be null");
public static class ContainsBuckets implements Serializable{
private static final long serialVersionUID = 1L;
- private Map<Address, Bucket> buckets;
+ private final Map<Address, Bucket> buckets;
public ContainsBuckets(Map<Address, Bucket> buckets){
Preconditions.checkArgument(buckets != null, "buckets can not be null");
for (Map.Entry<Address, Bucket> entry : buckets.entrySet()){
//ignore null entries
- if ( (entry.getKey() == null) || (entry.getValue() == null) )
+ if ( (entry.getKey() == null) || (entry.getValue() == null) ) {
continue;
+ }
copy.put(entry.getKey(), entry.getValue());
}
- return new HashMap<>(copy);
+ return copy;
}
}
public static final class GossipStatus extends ContainsBucketVersions implements Serializable{
private static final long serialVersionUID = 1L;
- private Address from;
+ private final Address from;
public GossipStatus(Address from, Map<Address, Long> versions) {
super(versions);
package org.opendaylight.controller.remote.rpc.registry;
-import akka.actor.ActorPath;
import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
-import akka.actor.ChildActorPath;
+import akka.actor.Address;
import akka.actor.Props;
-import akka.pattern.Patterns;
+import akka.japi.Pair;
import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
-import com.google.common.base.Predicate;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import javax.annotation.Nullable;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.utils.ConditionalProbe;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
private ActorRef registry2;
private ActorRef registry3;
+ private int routeIdCounter = 1;
+
@BeforeClass
public static void staticSetup() throws InterruptedException {
RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
final JavaTestKit mockBroker = new JavaTestKit(node1);
- final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
+ Address nodeAddress = node1.provider().getDefaultAddress();
// Add rpc on node 1
registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
- // install probe
- final JavaTestKit probe1 = createProbeForMessage(node1, bucketStorePath,
- Messages.BucketStoreMessages.UpdateBucket.class);
+ List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
- registry1.tell(getAddRouteMessage(), mockBroker.getRef());
+ registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker.getRef());
// Bucket store should get an update bucket message. Updated bucket contains added rpc.
- probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateBucket.class);
+
+ Map<Address, Bucket> buckets = retrieveBuckets(registry1, mockBroker, nodeAddress);
+ verifyBucket(buckets.get(nodeAddress), addedRouteIds);
+
+ Map<Address, Long> versions = retrieveVersions(registry1, mockBroker);
+ Assert.assertEquals("Version for bucket " + nodeAddress, buckets.get(nodeAddress).getVersion(),
+ versions.get(nodeAddress));
// Now remove rpc
- registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
+ registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker.getRef());
// Bucket store should get an update bucket message. Rpc is removed in the updated bucket
- probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateBucket.class);
+
+ verifyEmptyBucket(mockBroker, registry1, nodeAddress);
System.out.println("testAddRemoveRpcOnSameNode ending");
System.out.println("testRpcAddRemoveInCluster starting");
final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+ final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+
+ List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
- // install probe on node2's bucket store
- final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
- final JavaTestKit probe2 = createProbeForMessage(node2, bucketStorePath,
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+ Address node1Address = node1.provider().getDefaultAddress();
// Add rpc on node 1
registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
- registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
+ registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker1.getRef());
// Bucket store on node2 should get a message to update its local copy of remote buckets
- probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+ Map<Address, Bucket> buckets = retrieveBuckets(registry2, mockBroker2, node1Address);
+ verifyBucket(buckets.get(node1Address), addedRouteIds);
// Now remove
- registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
+ registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker1.getRef());
- // Bucket store on node2 should get a message to update its local copy of remote buckets
- probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+ // Bucket store on node2 should get a message to update its local copy of remote buckets.
+ // Wait for the bucket for node1 to be empty.
+
+ verifyEmptyBucket(mockBroker2, registry2, node1Address);
System.out.println("testRpcAddRemoveInCluster ending");
}
+ private void verifyEmptyBucket(JavaTestKit testKit, ActorRef registry, Address address)
+ throws AssertionError {
+ Map<Address, Bucket> buckets;
+ int nTries = 0;
+ while(true) {
+ buckets = retrieveBuckets(registry1, testKit, address);
+
+ try {
+ verifyBucket(buckets.get(address), Collections.<RouteIdentifier<?, ?, ?>>emptyList());
+ break;
+ } catch (AssertionError e) {
+ if(++nTries >= 50) {
+ throw e;
+ }
+ }
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+ }
+
/**
* Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated.
*
registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
- // install probe on node 3
- final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
- final JavaTestKit probe3 = createProbeForMessage(node3, bucketStorePath,
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
// Add rpc on node 1
+ List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds1 = createRouteIds();
registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
- registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
-
- probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+ registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), mockBroker1.getRef());
- // Add same rpc on node 2
+ // Add rpc on node 2
+ List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds2 = createRouteIds();
registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
- registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
+ registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), mockBroker2.getRef());
+
+ Address node1Address = node1.provider().getDefaultAddress();
+ Address node2Address = node2.provider().getDefaultAddress();
+
+ Map<Address, Bucket> buckets = retrieveBuckets(registry3, mockBroker3, node1Address,
+ node2Address);
- probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
- Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+ verifyBucket(buckets.get(node1Address), addedRouteIds1);
+ verifyBucket(buckets.get(node2Address), addedRouteIds2);
+
+ Map<Address, Long> versions = retrieveVersions(registry3, mockBroker3);
+ Assert.assertEquals("Version for bucket " + node1Address, buckets.get(node1Address).getVersion(),
+ versions.get(node1Address));
+ Assert.assertEquals("Version for bucket " + node2Address, buckets.get(node2Address).getVersion(),
+ versions.get(node2Address));
+
+ RouteIdentifier<?, ?, ?> routeID = addedRouteIds1.get(0);
+ registry3.tell(new FindRouters(routeID), mockBroker3.getRef());
+
+ FindRoutersReply reply = mockBroker3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+ FindRoutersReply.class);
+
+ List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
+ Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size());
+
+ respList.get(0).first().tell("hello", ActorRef.noSender());
+ mockBroker1.expectMsgEquals(Duration.create(3, TimeUnit.SECONDS), "hello");
}
- private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class<?> clazz)
- throws Exception {
- final JavaTestKit probe = new JavaTestKit(node);
-
- ConditionalProbe conditionalProbe = new ConditionalProbe(probe.getRef(), new Predicate<Object>() {
- @Override
- public boolean apply(@Nullable Object input) {
- if (input != null) {
- return clazz.equals(input.getClass());
- } else {
- return false;
- }
+ private Map<Address, Long> retrieveVersions(ActorRef bucketStore, JavaTestKit testKit) {
+ bucketStore.tell(new GetBucketVersions(), testKit.getRef());
+ GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+ GetBucketVersionsReply.class);
+ return reply.getVersions();
+ }
+
+ private void verifyBucket(Bucket<RoutingTable> bucket, List<RouteIdentifier<?, ?, ?>> expRouteIds) {
+ RoutingTable table = bucket.getData();
+ Assert.assertNotNull("Bucket RoutingTable is null", table);
+ for(RouteIdentifier<?, ?, ?> r: expRouteIds) {
+ if(!table.contains(r)) {
+ Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table);
}
- });
+ }
- FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
- Timeout timeout = new Timeout(duration);
- int maxTries = 30;
- int i = 0;
- while(true) {
- ActorSelection subject = node.actorSelection(subjectPath);
- Future<Object> future = Patterns.ask(subject, conditionalProbe, timeout);
+ Assert.assertEquals("RoutingTable size", expRouteIds.size(), table.size());
+ }
- try {
- Await.ready(future, duration);
- break;
- } catch (TimeoutException | InterruptedException e) {
- if(++i > maxTries) {
- throw e;
+ private Map<Address, Bucket> retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit,
+ Address... addresses) {
+ int nTries = 0;
+ while(true) {
+ bucketStore.tell(new GetAllBuckets(), testKit.getRef());
+ GetAllBucketsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+ GetAllBucketsReply.class);
+
+ Map<Address, Bucket> buckets = reply.getBuckets();
+ boolean foundAll = true;
+ for(Address addr: addresses) {
+ Bucket bucket = buckets.get(addr);
+ if(bucket == null) {
+ foundAll = false;
+ break;
}
}
- }
- return probe;
+ if(foundAll) {
+ return buckets;
+ }
- }
+ if(++nTries >= 50) {
+ Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses)
+ + ", Actual: " + buckets);
+ }
- private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
- return new AddOrUpdateRoutes(createRouteIds());
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
}
- private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
- return new RemoveRoutes(createRouteIds());
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAddRoutesConcurrency() throws Exception {
+ final JavaTestKit testKit = new JavaTestKit(node1);
+
+ registry1.tell(new SetLocalRouter(testKit.getRef()), ActorRef.noSender());
+
+ final int nRoutes = 500;
+ final RouteIdentifier<?, ?, ?>[] added = new RouteIdentifier<?, ?, ?>[nRoutes];
+ for(int i = 0; i < nRoutes; i++) {
+ final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null,
+ new QName(new URI("/mockrpc"), "type" + i), null);
+ added[i] = routeId;
+
+ //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ registry1.tell(new AddOrUpdateRoutes(Arrays.<RouteIdentifier<?, ?, ?>>asList(routeId)),
+ ActorRef.noSender());
+ }
+
+ GetAllBuckets getAllBuckets = new GetAllBuckets();
+ FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
+ int nTries = 0;
+ while(true) {
+ registry1.tell(getAllBuckets, testKit.getRef());
+ GetAllBucketsReply reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class);
+
+ Bucket<RoutingTable> localBucket = reply.getBuckets().values().iterator().next();
+ RoutingTable table = localBucket.getData();
+ if(table != null && table.size() == nRoutes) {
+ for(RouteIdentifier<?, ?, ?> r: added) {
+ Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r));
+ }
+
+ break;
+ }
+
+ if(++nTries >= 50) {
+ Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size());
+ }
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
}
private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
- QName type = new QName(new URI("/mockrpc"), "mockrpc");
+ QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++);
List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
routeIds.add(new RouteIdentifierImpl(null, type, null));
return routeIds;
}
-
}
import akka.actor.Props;
import akka.testkit.TestActorRef;
import com.typesafe.config.ConfigFactory;
+import java.util.HashMap;
+import java.util.Map;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.remote.rpc.TerminationMonitor;
-import java.util.HashMap;
-import java.util.Map;
-
public class BucketStoreTest {
private static ActorSystem system;
- private static BucketStore store;
@BeforeClass
public static void setup() {
system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
-
- store = createStore();
}
@AfterClass
system.shutdown();
}
- /**
- * Given a new local bucket
- * Should replace
- */
- @Test
- public void testReceiveUpdateBucket(){
- Bucket bucket = new BucketImpl();
- Long expectedVersion = bucket.getVersion();
-
- store.receiveUpdateBucket(bucket);
-
- Assert.assertEquals(bucket, store.getLocalBucket());
- Assert.assertEquals(expectedVersion, store.getLocalBucket().getVersion());
- }
-
/**
* Given remote buckets
* Should merge with local copy of remote buckets
@Test
public void testReceiveUpdateRemoteBuckets(){
+ BucketStore store = createStore();
+
Address localAddress = system.provider().getDefaultAddress();
Bucket localBucket = new BucketImpl();
//Should NOT contain local bucket
//Should contain ONLY 3 entries i.e a1, a2, a3
- Map<Address, Bucket> remoteBucketsInStore = store.getRemoteBuckets();
+ Map<Address, Bucket<?>> remoteBucketsInStore = store.getRemoteBuckets();
Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress));
Assert.assertTrue(remoteBucketsInStore.size() == 3);
Assert.assertTrue(remoteBucketsInStore.size() == 4);
//Should update versions map
- //versions map contains versions for all remote buckets (4) + local bucket
- //so it should have total 5.
+ //versions map contains versions for all remote buckets (4).
Map<Address, Long> versionsInStore = store.getVersions();
- Assert.assertTrue(String.format("Expected:%s, Actual:%s", 5, versionsInStore.size()),
- versionsInStore.size() == 5);
+ Assert.assertEquals(4, versionsInStore.size());
Assert.assertEquals(b1.getVersion(), versionsInStore.get(a1));
Assert.assertEquals(b2.getVersion(), versionsInStore.get(a2));
Assert.assertEquals(b3_new.getVersion(), versionsInStore.get(a3));
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
* Internal {@link TransactionChainListener} joining all DS commits
* to Set of chained changes for prevent often DataStore touches.
*/
- public interface StatDataStoreOperation {
+ public abstract class StatDataStoreOperation {
+ public enum StatsManagerOperationType {
+ /**
+ * Operation will carry out work related to new node addition /
+ * update
+ */
+ NODE_UPDATE,
+ /**
+ * Operation will carry out work related to node removal
+ */
+ NODE_REMOVAL,
+ /**
+ * Operation will commit data to the operational data store
+ */
+ DATA_COMMIT_OPER_DS
+ }
+
+ private NodeId nodeId;
+ private StatsManagerOperationType operationType = StatsManagerOperationType.DATA_COMMIT_OPER_DS;
+
+ public StatDataStoreOperation(final StatsManagerOperationType operType, final NodeId id){
+ if(operType != null){
+ operationType = operType;
+ }
+ nodeId = id;
+ }
+
+ public final StatsManagerOperationType getType() {
+ return operationType;
+ }
+
+ public final NodeId getNodeId(){
+ return nodeId;
+ }
/**
- * Apply all read / write (put|merge) operation
- * for DataStore
+ * Apply all read / write (put|merge) operation for DataStore
+ *
* @param {@link ReadWriteTransaction} tx
*/
- void applyOperation(ReadWriteTransaction tx);
+ public abstract void applyOperation(ReadWriteTransaction tx);
}
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.md.statistics.manager.impl.helper.FlowComparator;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
return;
}
/* check flow Capable Node and write statistics */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
return;
}
/* add flow's statistics */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
/* Notification for continue collecting statistics */
notifyToCollectNextStatistics(nodeIdent, transId);
}
+
});
}
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
/* Get and Validate TransactionCacheContainer */
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
/* Get and Validate TransactionCacheContainer */
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FeatureCapability;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
Preconditions.checkNotNull(data, "SwitchFeatures data for {} can not be null!", keyIdent);
Preconditions.checkArgument(( ! keyIdent.isWildcarded()), "InstanceIdentifier is WildCarded!");
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_UPDATE,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
+
@Override
public void applyOperation(final ReadWriteTransaction tx) {
Preconditions.checkArgument(nodeIdent != null, "InstanceIdentifier can not be NULL!");
Preconditions.checkArgument(( ! nodeIdent.isWildcarded()),
"InstanceIdentifier {} is WildCarded!", nodeIdent);
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_REMOVAL,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
+
@Override
public void applyOperation(final ReadWriteTransaction tx) {
manager.disconnectedNodeUnregistration(nodeIdent);
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(nodeId));
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction trans) {
final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
return;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction trans) {
final List<FlowTableAndStatisticsMap> tableStats = new ArrayList<FlowTableAndStatisticsMap>(10);
package org.opendaylight.controller.md.statistics.manager.impl;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
@Override
public void onSuccess(final RpcResult<? extends TransactionAware> result) {
final TransactionId id = result.getResult().getTransactionId();
+ final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
if (id == null) {
- LOG.warn("No protocol support");
+ String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})");
+ LOG.warn("Node [{}] does not support statistics request type : {}",
+ nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2)));
} else {
if (resultTransId != null) {
resultTransId.set(id);
}
- final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class);
final String cacheKey = buildCacheKey(id, nodeKey.getId());
final TransactionCacheContainer<? super TransactionAware> container =
new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId());
import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
private synchronized void cleanDataStoreOperQueue() {
// Drain all events, making sure any blocked threads are unblocked
while (! dataStoreOperQueue.isEmpty()) {
- dataStoreOperQueue.poll();
+ StatDataStoreOperation op = dataStoreOperQueue.poll();
+
+ // Execute the node removal clean up operation if queued in the
+ // operational queue.
+ if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) {
+ try {
+ LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId());
+ op.applyOperation(null);
+ } catch (final Exception ex) {
+ LOG.warn("Unhandled exception while cleaning up internal data of node [{}]",op.getNodeId());
+ }
+ }
}
}
<module>opendaylight/commons/liblldp</module>
<!-- Karaf Distribution -->
- <module>opendaylight/karaf-branding</module>
- <module>opendaylight/distribution/opendaylight-karaf-empty</module>
- <module>opendaylight/distribution/opendaylight-karaf</module>
- <module>opendaylight/distribution/opendaylight-karaf-resources</module>
+ <module>karaf</module>
<module>features</module>
<!-- archetypes -->