.DS_STORE
.metadata
opendaylight/md-sal/sal-distributed-datastore/journal
+!opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin
+
</parent>
<artifactId>features-config-netty</artifactId>
- <packaging>pom</packaging>
+ <packaging>jar</packaging>
<properties>
<features.file>features.xml</features.file>
<artifactId>features-config-persister</artifactId>
<classifier>features</classifier>
<type>xml</type>
- <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-netty-config</artifactId>
</dependency>
+ <!-- test to validate features.xml -->
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>features-test</artifactId>
+ </dependency>
</dependencies>
<build>
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>
+ <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>
+ <karaf.distro.version>${commons.opendaylight.version}</karaf.distro.version>
+ </systemPropertyVariables>
+ <dependenciesToScan>
+ <dependency>org.opendaylight.yangtools:features-test</dependency>
+ </dependenciesToScan>
+ </configuration>
+ </plugin>
</plugins>
</build>
<scm>
</parent>
<artifactId>features-config-persister</artifactId>
- <packaging>pom</packaging>
+ <packaging>jar</packaging>
<properties>
<features.file>features.xml</features.file>
<version>${yangtools.version}</version>
<classifier>features</classifier>
<type>xml</type>
- <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>features-netconf</artifactId>
<classifier>features</classifier>
<type>xml</type>
- <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>features-config</artifactId>
<classifier>features</classifier>
<type>xml</type>
- <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<groupId>org.eclipse.persistence</groupId>
<artifactId>org.eclipse.persistence.moxy</artifactId>
</dependency>
+ <!-- test to validate features.xml -->
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>features-test</artifactId>
+ </dependency>
</dependencies>
<build>
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>
+ <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>
+ <karaf.distro.version>${commons.opendaylight.version}</karaf.distro.version>
+ </systemPropertyVariables>
+ <dependenciesToScan>
+ <dependency>org.opendaylight.yangtools:features-test</dependency>
+ </dependenciesToScan>
+ </configuration>
+ </plugin>
</plugins>
</build>
<scm>
</parent>
<artifactId>features-config</artifactId>
- <packaging>pom</packaging>
+ <packaging>jar</packaging>
<properties>
<features.file>features.xml</features.file>
<version>${yangtools.version}</version>
<classifier>features</classifier>
<type>xml</type>
- <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-manager</artifactId>
</dependency>
+ <!-- test the features.xml -->
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>features-test</artifactId>
+ </dependency>
</dependencies>
<build>
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>
+ <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>
+ <karaf.distro.version>${commons.opendaylight.version}</karaf.distro.version>
+ </systemPropertyVariables>
+ <dependenciesToScan>
+ <dependency>org.opendaylight.yangtools:features-test</dependency>
+ </dependenciesToScan>
+ </configuration>
+ </plugin>
</plugins>
</build>
<scm>
<repository>mvn:org.opendaylight.yangtools/features-yangtools/${yangtools.version}/xml/features</repository>
<feature name='odl-config-all' version='${project.version}'>
- <feature version='${project.version}'>odl-mdsal-common</feature>
+ <feature version='${mdsal.version}'>odl-mdsal-common</feature>
<feature version='${project.version}'>odl-config-api</feature>
<feature version='${project.version}'>odl-config-netty-config-api</feature>
<feature version='${project.version}'>odl-config-core</feature>
</parent>
<artifactId>features-flow</artifactId>
- <packaging>pom</packaging>
+ <packaging>jar</packaging>
<properties>
<features.file>features.xml</features.file>
<version>${mdsal.version}</version>
<classifier>features</classifier>
<type>xml</type>
- <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.opendaylight.controller.model</groupId>
<groupId>org.opendaylight.controller.md</groupId>
<artifactId>forwardingrules-manager</artifactId>
</dependency>
+ <!-- test to validate features.xml -->
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>features-test</artifactId>
+ </dependency>
</dependencies>
<build>
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>
+ <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>
+ <karaf.distro.version>${commons.opendaylight.version}</karaf.distro.version>
+ </systemPropertyVariables>
+ <dependenciesToScan>
+ <dependency>org.opendaylight.yangtools:features-test</dependency>
+ </dependenciesToScan>
+ </configuration>
+ </plugin>
</plugins>
</build>
<scm>
</parent>
<artifactId>features-mdsal</artifactId>
- <packaging>pom</packaging>
+ <packaging>jar</packaging>
<properties>
<features.file>features.xml</features.file>
<artifactId>features-yangtools</artifactId>
<classifier>features</classifier>
<type>xml</type>
- <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>features-config</artifactId>
<classifier>features</classifier>
<type>xml</type>
- <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>features-config-persister</artifactId>
<classifier>features</classifier>
<type>xml</type>
- <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>features-config-netty</artifactId>
<classifier>features</classifier>
<type>xml</type>
- <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<groupId>org.opendaylight.controller.samples</groupId>
<artifactId>toaster-config</artifactId>
</dependency>
+ <!-- test to validate features.xml -->
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>features-test</artifactId>
+ <version>0.6.2-SNAPSHOT</version>
+ </dependency>
</dependencies>
<build>
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>
+ <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>
+ <karaf.distro.version>${commons.opendaylight.version}</karaf.distro.version>
+ </systemPropertyVariables>
+ <dependenciesToScan>
+ <dependency>org.opendaylight.yangtools:features-test</dependency>
+ </dependenciesToScan>
+ </configuration>
+ </plugin>
</plugins>
</build>
<scm>
</parent>
<artifactId>features-protocol-framework</artifactId>
<version>${protocol-framework.version}</version>
- <packaging>pom</packaging>
+ <packaging>jar</packaging>
<properties>
<features.file>features.xml</features.file>
<artifactId>features-config</artifactId>
<classifier>features</classifier>
<type>xml</type>
- <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>protocol-framework</artifactId>
</dependency>
+ <!-- test to validate features.xml -->
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>features-test</artifactId>
+ </dependency>
</dependencies>
<build>
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>
+ <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>
+ <karaf.distro.version>${commons.opendaylight.version}</karaf.distro.version>
+ </systemPropertyVariables>
+ <dependenciesToScan>
+ <dependency>org.opendaylight.yangtools:features-test</dependency>
+ </dependenciesToScan>
+ </configuration>
+ </plugin>
</plugins>
</build>
<scm>
<artifactId>toaster-config</artifactId>
<version>${mdsal.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>features-test</artifactId>
+ <version>${yangtools.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>features-yangtools</artifactId>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>commons.opendaylight</artifactId>
+ <version>1.4.2-SNAPSHOT</version>
+ <relativePath>../../commons/opendaylight</relativePath>
+ </parent>
+ <artifactId>opendaylight-karaf-empty</artifactId>
+ <packaging>pom</packaging>
+ <prerequisites>
+ <maven>3.0</maven>
+ </prerequisites>
+
+ <dependencies>
+ <dependency>
+ <!-- scope is compile so all features (there is only one) are installed
+ into startup.properties and the feature repo itself is not installed -->
+ <groupId>org.apache.karaf.features</groupId>
+ <artifactId>framework</artifactId>
+ <version>${karaf.version}</version>
+ <type>kar</type>
+ </dependency>
+ <!-- scope is runtime so the feature repo is listed in the features
+ service config file, and features may be installed using the
+ karaf-maven-plugin configuration -->
+ <dependency>
+ <groupId>org.apache.karaf.features</groupId>
+ <artifactId>standard</artifactId>
+ <version>${karaf.version}</version>
+ <classifier>features</classifier>
+ <type>xml</type>
+ <scope>runtime</scope>
+ </dependency>
+
+ <!-- ODL Branding -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>karaf.branding</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <!-- Resources needed -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>opendaylight-karaf-resources</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <versionRange>[0,)</versionRange>
+ <goals>
+ <goal>cleanVersions</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <versionRange>[0,)</versionRange>
+ <goals>
+ <goal>copy</goal>
+ <goal>unpack</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.karaf.tooling</groupId>
+ <artifactId>karaf-maven-plugin</artifactId>
+ <versionRange>[0,)</versionRange>
+ <goals>
+ <goal>commands-generate-help</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.fusesource.scalate</groupId>
+ <artifactId>maven-scalate-plugin</artifactId>
+ <versionRange>[0,)</versionRange>
+ <goals>
+ <goal>sitegen</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.servicemix.tooling</groupId>
+ <artifactId>depends-maven-plugin</artifactId>
+ <versionRange>[0,)</versionRange>
+ <goals>
+ <goal>generate-depends-file</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.karaf.tooling</groupId>
+ <artifactId>karaf-maven-plugin</artifactId>
+ <version>${karaf.version}</version>
+ <extensions>true</extensions>
+ <executions>
+ <execution>
+ <id>process-resources</id>
+ <goals>
+ <goal>install-kars</goal>
+ </goals>
+ <phase>process-resources</phase>
+ </execution>
+ <execution>
+ <id>package</id>
+ <goals>
+ <goal>instance-create-archive</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>${checkstyle.version}</version>
+ <configuration>
+ <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/configuration\/initial\/</excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.6</version>
+ <executions>
+ <execution>
+ <id>copy</id>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <!-- here the phase you need -->
+ <phase>generate-resources</phase>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>karaf.branding</artifactId>
+ <version>${karaf.branding.version}</version>
+ <outputDirectory>target/assembly/lib</outputDirectory>
+ <destFileName>karaf.branding-${branding.version}.jar</destFileName>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ <execution>
+ <id>unpack-karaf-resources</id>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ <phase>prepare-package</phase>
+ <configuration>
+ <outputDirectory>${project.build.directory}/assembly</outputDirectory>
+ <groupId>org.opendaylight.controller</groupId>
+ <includeArtifactIds>opendaylight-karaf-resources</includeArtifactIds>
+ <excludes>META-INF\/**</excludes>
+ <excludeTransitive>true</excludeTransitive>
+ <ignorePermissions>false</ignorePermissions>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <tasks>
+ <chmod perm="755">
+ <fileset dir="${project.build.directory}/assembly/bin">
+ <include name="karaf"/>
+ <include name="instance"/>
+ </fileset>
+ </chmod>
+ </tasks>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <scm>
+ <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+ <tag>HEAD</tag>
+ <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:Main</url>
+ </scm>
+</project>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>commons.opendaylight</artifactId>
+ <version>1.4.2-SNAPSHOT</version>
+ <relativePath>../../commons/opendaylight</relativePath>
+ </parent>
+ <artifactId>opendaylight-karaf-resources</artifactId>
+ <description>Resources for opendaylight-karaf</description>
+ <packaging>jar</packaging>
+</project>
<version>${karaf.version}</version>
<type>kar</type>
</dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>base-features</artifactId>
- <version>${project.version}</version>
- <type>kar</type>
- </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>karaf.branding</artifactId>
<scope>compile</scope>
</dependency>
- <!-- scope is runtime so the feature repo is listed in the features
- service config file, and features may be installed using the
- karaf-maven-plugin configuration -->
- <dependency>
- <groupId>org.apache.karaf.features</groupId>
- <artifactId>standard</artifactId>
- <version>${karaf.version}</version>
- <classifier>features</classifier>
- <type>xml</type>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>base-features</artifactId>
- <version>${project.parent.version}</version>
- <classifier>features</classifier>
- <type>xml</type>
- <scope>runtime</scope>
- </dependency>
- <!-- scope is compile so all features (there is only one) are installed
- into startup.properties and the feature repo itself is not installed -->
+
+ <!-- Resources needed -->
<dependency>
<groupId>org.opendaylight.controller</groupId>
- <artifactId>extras-features</artifactId>
+ <artifactId>opendaylight-karaf-resources</artifactId>
<version>${project.version}</version>
- <type>kar</type>
- <scope>runtime</scope>
- </dependency>
- <!-- AD-SAL Related Features -->
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>features-adsal</artifactId>
- <classifier>features</classifier>
- <type>xml</type>
- <scope>runtime</scope>
</dependency>
+
+ <!-- scope is not runtime so the feature repo is pulled into the local
+ repo on build and thus you actually run. Failure to do so can lead
+ to very confusing errors for devs -->
<dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>features-nsf</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.karaf.features</groupId>
+ <artifactId>standard</artifactId>
+ <version>${karaf.version}</version>
<classifier>features</classifier>
<type>xml</type>
- <scope>runtime</scope>
</dependency>
+ <!--
+ controller provided features:
+ Note: Nothing should go here that is not locked
+ down with testing... ie, no broken feature repos
+ -->
+
<!-- MD-SAL Related Features -->
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>features-mdsal</artifactId>
<classifier>features</classifier>
<type>xml</type>
- <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>features-flow</artifactId>
<classifier>features</classifier>
<type>xml</type>
- <scope>runtime</scope>
</dependency>
</dependencies>
</artifactItems>
</configuration>
</execution>
+ <execution>
+ <id>unpack-karaf-resources</id>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ <phase>prepare-package</phase>
+ <configuration>
+ <outputDirectory>${project.build.directory}/assembly</outputDirectory>
+ <groupId>org.opendaylight.controller</groupId>
+ <includeArtifactIds>opendaylight-karaf-resources</includeArtifactIds>
+ <excludes>META-INF\/**</excludes>
+ <excludeTransitive>true</excludeTransitive>
+ <ignorePermissions>false</ignorePermissions>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <tasks>
+ <chmod perm="755">
+ <fileset dir="${project.build.directory}/assembly/bin">
+ <include name="karaf"/>
+ <include name="instance"/>
+ </fileset>
+ </chmod>
+ </tasks>
+ </configuration>
+ </execution>
</executions>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <executions>
- <execution>
- <phase>prepare-package</phase>
- <goals>
- <goal>run</goal>
- </goals>
- <configuration>
- <tasks>
- <copy todir="${project.build.directory}/assembly/bin" overwrite="true">
- <fileset dir="${basedir}/src/main/resources/karaf/" includes="karaf,karaf.bat,instance,instance.bat"/>
- </copy>
- </tasks>
- </configuration>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
<scm>
+++ /dev/null
-################################################################################
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-################################################################################
-
-#
-# Java platform package export properties.
-#
-
-# Standard package set. Note that:
-# - javax.transaction* is exported with a mandatory attribute
-jre-1.6= \
- javax.accessibility, \
- javax.activation;version="1.1", \
- javax.activity, \
- javax.crypto, \
- javax.crypto.interfaces, \
- javax.crypto.spec, \
- javax.imageio, \
- javax.imageio.event, \
- javax.imageio.metadata, \
- javax.imageio.plugins.bmp, \
- javax.imageio.plugins.jpeg, \
- javax.imageio.spi, \
- javax.imageio.stream, \
- javax.jws, \
- javax.jws.soap, \
- javax.lang.model, \
- javax.lang.model.element, \
- javax.lang.model.type, \
- javax.lang.model.util, \
- javax.management, \
- javax.management.loading, \
- javax.management.modelmbean, \
- javax.management.monitor, \
- javax.management.openmbean, \
- javax.management.relation, \
- javax.management.remote, \
- javax.management.remote.rmi, \
- javax.management.timer, \
- javax.naming, \
- javax.naming.directory, \
- javax.naming.event, \
- javax.naming.ldap, \
- javax.naming.spi, \
- javax.net, \
- javax.net.ssl, \
- javax.print, \
- javax.print.attribute, \
- javax.print.attribute.standard, \
- javax.print.event, \
- javax.rmi, \
- javax.rmi.CORBA, \
- javax.rmi.ssl, \
- javax.script, \
- javax.security.auth, \
- javax.security.auth.callback, \
- javax.security.auth.kerberos, \
- javax.security.auth.login, \
- javax.security.auth.spi, \
- javax.security.auth.x500, \
- javax.security.cert, \
- javax.security.sasl, \
- javax.sound.midi, \
- javax.sound.midi.spi, \
- javax.sound.sampled, \
- javax.sound.sampled.spi, \
- javax.sql, \
- javax.sql.rowset, \
- javax.sql.rowset.serial, \
- javax.sql.rowset.spi, \
- javax.swing, \
- javax.swing.border, \
- javax.swing.colorchooser, \
- javax.swing.event, \
- javax.swing.filechooser, \
- javax.swing.plaf, \
- javax.swing.plaf.basic, \
- javax.swing.plaf.metal, \
- javax.swing.plaf.multi, \
- javax.swing.plaf.synth, \
- javax.swing.table, \
- javax.swing.text, \
- javax.swing.text.html, \
- javax.swing.text.html.parser, \
- javax.swing.text.rtf, \
- javax.swing.tree, \
- javax.swing.undo, \
- javax.tools, \
- javax.transaction; javax.transaction.xa; partial=true; mandatory:=partial, \
- javax.xml, \
- javax.xml.bind;version="2.2.1", \
- javax.xml.bind.annotation;version="2.2.1", \
- javax.xml.bind.annotation.adapters;version="2.2.1", \
- javax.xml.bind.attachment;version="2.2.1", \
- javax.xml.bind.helpers;version="2.2.1", \
- javax.xml.bind.util;version="2.2.1", \
- javax.xml.crypto, \
- javax.xml.crypto.dom, \
- javax.xml.crypto.dsig, \
- javax.xml.crypto.dsig.dom, \
- javax.xml.crypto.dsig.keyinfo, \
- javax.xml.crypto.dsig.spec, \
- javax.xml.datatype, \
- javax.xml.namespace, \
- javax.xml.parsers, \
- javax.xml.soap;version="1.3", \
- javax.xml.stream;version="1.2", \
- javax.xml.stream.events;version="1.2", \
- javax.xml.stream.util;version="1.2", \
- javax.xml.transform, \
- javax.xml.transform.dom, \
- javax.xml.transform.sax, \
- javax.xml.transform.stax, \
- javax.xml.transform.stream, \
- javax.xml.validation, \
- javax.xml.ws;version="2.2", \
- javax.xml.ws.handler;version="2.2", \
- javax.xml.ws.handler.soap;version="2.2", \
- javax.xml.ws.http;version="2.2", \
- javax.xml.ws.soap;version="2.2", \
- javax.xml.ws.spi;version="2.2", \
- javax.xml.ws.wsaddressing;version="2.2", \
- javax.xml.ws.spi.http;version="2.2", \
- javax.xml.xpath, \
- org.ietf.jgss, \
- org.omg.CORBA, \
- org.omg.CORBA_2_3, \
- org.omg.CORBA_2_3.portable, \
- org.omg.CORBA.DynAnyPackage, \
- org.omg.CORBA.ORBPackage, \
- org.omg.CORBA.portable, \
- org.omg.CORBA.TypeCodePackage, \
- org.omg.CosNaming, \
- org.omg.CosNaming.NamingContextExtPackage, \
- org.omg.CosNaming.NamingContextPackage, \
- org.omg.Dynamic, \
- org.omg.DynamicAny, \
- org.omg.DynamicAny.DynAnyFactoryPackage, \
- org.omg.DynamicAny.DynAnyPackage, \
- org.omg.IOP, \
- org.omg.IOP.CodecFactoryPackage, \
- org.omg.IOP.CodecPackage, \
- org.omg.Messaging, \
- org.omg.PortableInterceptor, \
- org.omg.PortableInterceptor.ORBInitInfoPackage, \
- org.omg.PortableServer, \
- org.omg.PortableServer.CurrentPackage, \
- org.omg.PortableServer.POAManagerPackage, \
- org.omg.PortableServer.POAPackage, \
- org.omg.PortableServer.portable, \
- org.omg.PortableServer.ServantLocatorPackage, \
- org.omg.SendingContext, \
- org.omg.stub.java.rmi, \
- org.omg.stub.javax.management.remote.rmi, \
- org.w3c.dom, \
- org.w3c.dom.bootstrap, \
- org.w3c.dom.css, \
- org.w3c.dom.events, \
- org.w3c.dom.html, \
- org.w3c.dom.ls, \
- org.w3c.dom.ranges, \
- org.w3c.dom.stylesheets, \
- org.w3c.dom.traversal, \
- org.w3c.dom.views, \
- org.w3c.dom.xpath, \
- org.xml.sax, \
- org.xml.sax.ext, \
- org.xml.sax.helpers, \
- javax.annotation.processing
-
-# Standard package set. Note that:
-# - javax.transaction* is exported with a mandatory attribute
-jre-1.7= \
- javax.accessibility, \
- javax.activation;version="1.1", \
- javax.activity, \
- javax.crypto, \
- javax.crypto.interfaces, \
- javax.crypto.spec, \
- javax.imageio, \
- javax.imageio.event, \
- javax.imageio.metadata, \
- javax.imageio.plugins.bmp, \
- javax.imageio.plugins.jpeg, \
- javax.imageio.spi, \
- javax.imageio.stream, \
- javax.jws, \
- javax.jws.soap, \
- javax.lang.model, \
- javax.lang.model.element, \
- javax.lang.model.type, \
- javax.lang.model.util, \
- javax.management, \
- javax.management.loading, \
- javax.management.modelmbean, \
- javax.management.monitor, \
- javax.management.openmbean, \
- javax.management.relation, \
- javax.management.remote, \
- javax.management.remote.rmi, \
- javax.management.timer, \
- javax.naming, \
- javax.naming.directory, \
- javax.naming.event, \
- javax.naming.ldap, \
- javax.naming.spi, \
- javax.net, \
- javax.net.ssl, \
- javax.print, \
- javax.print.attribute, \
- javax.print.attribute.standard, \
- javax.print.event, \
- javax.rmi, \
- javax.rmi.CORBA, \
- javax.rmi.ssl, \
- javax.script, \
- javax.security.auth, \
- javax.security.auth.callback, \
- javax.security.auth.kerberos, \
- javax.security.auth.login, \
- javax.security.auth.spi, \
- javax.security.auth.x500, \
- javax.security.cert, \
- javax.security.sasl, \
- javax.sound.midi, \
- javax.sound.midi.spi, \
- javax.sound.sampled, \
- javax.sound.sampled.spi, \
- javax.sql, \
- javax.sql.rowset, \
- javax.sql.rowset.serial, \
- javax.sql.rowset.spi, \
- javax.swing, \
- javax.swing.border, \
- javax.swing.colorchooser, \
- javax.swing.event, \
- javax.swing.filechooser, \
- javax.swing.plaf, \
- javax.swing.plaf.basic, \
- javax.swing.plaf.metal, \
- javax.swing.plaf.multi, \
- javax.swing.plaf.synth, \
- javax.swing.table, \
- javax.swing.text, \
- javax.swing.text.html, \
- javax.swing.text.html.parser, \
- javax.swing.text.rtf, \
- javax.swing.tree, \
- javax.swing.undo, \
- javax.tools, \
- javax.transaction; javax.transaction.xa; partial=true; mandatory:=partial, \
- javax.xml, \
- javax.xml.bind;version="2.2.1", \
- javax.xml.bind.annotation;version="2.2.1", \
- javax.xml.bind.annotation.adapters;version="2.2.1", \
- javax.xml.bind.attachment;version="2.2.1", \
- javax.xml.bind.helpers;version="2.2.1", \
- javax.xml.bind.util;version="2.2.1", \
- javax.xml.crypto, \
- javax.xml.crypto.dom, \
- javax.xml.crypto.dsig, \
- javax.xml.crypto.dsig.dom, \
- javax.xml.crypto.dsig.keyinfo, \
- javax.xml.crypto.dsig.spec, \
- javax.xml.datatype, \
- javax.xml.namespace, \
- javax.xml.parsers, \
- javax.xml.soap;version="1.3", \
- javax.xml.stream;version="1.2", \
- javax.xml.stream.events;version="1.2", \
- javax.xml.stream.util;version="1.2", \
- javax.xml.transform, \
- javax.xml.transform.dom, \
- javax.xml.transform.sax, \
- javax.xml.transform.stax, \
- javax.xml.transform.stream, \
- javax.xml.validation, \
- javax.xml.ws;version="2.2", \
- javax.xml.ws.handler;version="2.2", \
- javax.xml.ws.handler.soap;version="2.2", \
- javax.xml.ws.http;version="2.2", \
- javax.xml.ws.soap;version="2.2", \
- javax.xml.ws.spi;version="2.2", \
- javax.xml.ws.wsaddressing;version="2.2", \
- javax.xml.ws.spi.http;version="2.2", \
- javax.xml.xpath, \
- org.ietf.jgss, \
- org.omg.CORBA, \
- org.omg.CORBA_2_3, \
- org.omg.CORBA_2_3.portable, \
- org.omg.CORBA.DynAnyPackage, \
- org.omg.CORBA.ORBPackage, \
- org.omg.CORBA.portable, \
- org.omg.CORBA.TypeCodePackage, \
- org.omg.CosNaming, \
- org.omg.CosNaming.NamingContextExtPackage, \
- org.omg.CosNaming.NamingContextPackage, \
- org.omg.Dynamic, \
- org.omg.DynamicAny, \
- org.omg.DynamicAny.DynAnyFactoryPackage, \
- org.omg.DynamicAny.DynAnyPackage, \
- org.omg.IOP, \
- org.omg.IOP.CodecFactoryPackage, \
- org.omg.IOP.CodecPackage, \
- org.omg.Messaging, \
- org.omg.PortableInterceptor, \
- org.omg.PortableInterceptor.ORBInitInfoPackage, \
- org.omg.PortableServer, \
- org.omg.PortableServer.CurrentPackage, \
- org.omg.PortableServer.POAManagerPackage, \
- org.omg.PortableServer.POAPackage, \
- org.omg.PortableServer.portable, \
- org.omg.PortableServer.ServantLocatorPackage, \
- org.omg.SendingContext, \
- org.omg.stub.java.rmi, \
- org.omg.stub.javax.management.remote.rmi, \
- org.w3c.dom, \
- org.w3c.dom.bootstrap, \
- org.w3c.dom.css, \
- org.w3c.dom.events, \
- org.w3c.dom.html, \
- org.w3c.dom.ls, \
- org.w3c.dom.ranges, \
- org.w3c.dom.stylesheets, \
- org.w3c.dom.traversal, \
- org.w3c.dom.views, \
- org.w3c.dom.xpath, \
- org.xml.sax, \
- org.xml.sax.ext, \
- org.xml.sax.helpers, \
- javax.annotation.processing
-
-jre-1.8= \
- javax.accessibility, \
- javax.activation;version="1.1", \
- javax.activity, \
- javax.crypto, \
- javax.crypto.interfaces, \
- javax.crypto.spec, \
- javax.imageio, \
- javax.imageio.event, \
- javax.imageio.metadata, \
- javax.imageio.plugins.bmp, \
- javax.imageio.plugins.jpeg, \
- javax.imageio.spi, \
- javax.imageio.stream, \
- javax.jws, \
- javax.jws.soap, \
- javax.lang.model, \
- javax.lang.model.element, \
- javax.lang.model.type, \
- javax.lang.model.util, \
- javax.management, \
- javax.management.loading, \
- javax.management.modelmbean, \
- javax.management.monitor, \
- javax.management.openmbean, \
- javax.management.relation, \
- javax.management.remote, \
- javax.management.remote.rmi, \
- javax.management.timer, \
- javax.naming, \
- javax.naming.directory, \
- javax.naming.event, \
- javax.naming.ldap, \
- javax.naming.spi, \
- javax.net, \
- javax.net.ssl, \
- javax.print, \
- javax.print.attribute, \
- javax.print.attribute.standard, \
- javax.print.event, \
- javax.rmi, \
- javax.rmi.CORBA, \
- javax.rmi.ssl, \
- javax.script, \
- javax.security.auth, \
- javax.security.auth.callback, \
- javax.security.auth.kerberos, \
- javax.security.auth.login, \
- javax.security.auth.spi, \
- javax.security.auth.x500, \
- javax.security.cert, \
- javax.security.sasl, \
- javax.sound.midi, \
- javax.sound.midi.spi, \
- javax.sound.sampled, \
- javax.sound.sampled.spi, \
- javax.sql, \
- javax.sql.rowset, \
- javax.sql.rowset.serial, \
- javax.sql.rowset.spi, \
- javax.swing, \
- javax.swing.border, \
- javax.swing.colorchooser, \
- javax.swing.event, \
- javax.swing.filechooser, \
- javax.swing.plaf, \
- javax.swing.plaf.basic, \
- javax.swing.plaf.metal, \
- javax.swing.plaf.multi, \
- javax.swing.plaf.synth, \
- javax.swing.table, \
- javax.swing.text, \
- javax.swing.text.html, \
- javax.swing.text.html.parser, \
- javax.swing.text.rtf, \
- javax.swing.tree, \
- javax.swing.undo, \
- javax.tools, \
- javax.transaction; javax.transaction.xa; partial=true; mandatory:=partial, \
- javax.xml, \
- javax.xml.bind;version="2.2.1", \
- javax.xml.bind.annotation;version="2.2.1", \
- javax.xml.bind.annotation.adapters;version="2.2.1", \
- javax.xml.bind.attachment;version="2.2.1", \
- javax.xml.bind.helpers;version="2.2.1", \
- javax.xml.bind.util;version="2.2.1", \
- javax.xml.crypto, \
- javax.xml.crypto.dom, \
- javax.xml.crypto.dsig, \
- javax.xml.crypto.dsig.dom, \
- javax.xml.crypto.dsig.keyinfo, \
- javax.xml.crypto.dsig.spec, \
- javax.xml.datatype, \
- javax.xml.namespace, \
- javax.xml.parsers, \
- javax.xml.soap;version="1.3", \
- javax.xml.stream;version="1.2", \
- javax.xml.stream.events;version="1.2", \
- javax.xml.stream.util;version="1.2", \
- javax.xml.transform, \
- javax.xml.transform.dom, \
- javax.xml.transform.sax, \
- javax.xml.transform.stax, \
- javax.xml.transform.stream, \
- javax.xml.validation, \
- javax.xml.ws;version="2.2", \
- javax.xml.ws.handler;version="2.2", \
- javax.xml.ws.handler.soap;version="2.2", \
- javax.xml.ws.http;version="2.2", \
- javax.xml.ws.soap;version="2.2", \
- javax.xml.ws.spi;version="2.2", \
- javax.xml.ws.wsaddressing;version="2.2", \
- javax.xml.ws.spi.http;version="2.2", \
- javax.xml.xpath, \
- org.ietf.jgss, \
- org.omg.CORBA, \
- org.omg.CORBA_2_3, \
- org.omg.CORBA_2_3.portable, \
- org.omg.CORBA.DynAnyPackage, \
- org.omg.CORBA.ORBPackage, \
- org.omg.CORBA.portable, \
- org.omg.CORBA.TypeCodePackage, \
- org.omg.CosNaming, \
- org.omg.CosNaming.NamingContextExtPackage, \
- org.omg.CosNaming.NamingContextPackage, \
- org.omg.Dynamic, \
- org.omg.DynamicAny, \
- org.omg.DynamicAny.DynAnyFactoryPackage, \
- org.omg.DynamicAny.DynAnyPackage, \
- org.omg.IOP, \
- org.omg.IOP.CodecFactoryPackage, \
- org.omg.IOP.CodecPackage, \
- org.omg.Messaging, \
- org.omg.PortableInterceptor, \
- org.omg.PortableInterceptor.ORBInitInfoPackage, \
- org.omg.PortableServer, \
- org.omg.PortableServer.CurrentPackage, \
- org.omg.PortableServer.POAManagerPackage, \
- org.omg.PortableServer.POAPackage, \
- org.omg.PortableServer.portable, \
- org.omg.PortableServer.ServantLocatorPackage, \
- org.omg.SendingContext, \
- org.omg.stub.java.rmi, \
- org.omg.stub.javax.management.remote.rmi, \
- org.w3c.dom, \
- org.w3c.dom.bootstrap, \
- org.w3c.dom.css, \
- org.w3c.dom.events, \
- org.w3c.dom.html, \
- org.w3c.dom.ls, \
- org.w3c.dom.ranges, \
- org.w3c.dom.stylesheets, \
- org.w3c.dom.traversal, \
- org.w3c.dom.views, \
- org.w3c.dom.xpath, \
- org.xml.sax, \
- org.xml.sax.ext, \
- org.xml.sax.helpers, \
- javax.annotation.processing
+++ /dev/null
-#Bundles to be started on startup, with startlevel
-
-# feature: framework version: 3.0.1
-mvn\:org.ops4j.base/ops4j-base-lang/1.4.0 = 5
-mvn\:biz.aQute.bnd/bndlib/2.2.0 = 5
-mvn\:org.ops4j.pax.swissbox/pax-swissbox-bnd/1.7.0 = 5
-mvn\:org.ops4j.pax.url/pax-url-maven-commons/1.6.0 = 5
-mvn\:org.ops4j.pax.url/pax-url-aether/1.6.0 = 5
-mvn\:org.ops4j.pax.url/pax-url-wrap/1.6.0 = 5
-mvn\:javax.annotation/javax.annotation-api/1.2 = 5
-mvn\:org.ops4j.pax.logging/pax-logging-api/1.7.2 = 8
-mvn\:org.ops4j.pax.logging/pax-logging-service/1.7.2 = 8
-mvn\:org.apache.karaf.service/org.apache.karaf.service.guard/3.0.1 = 10
-mvn\:org.apache.felix/org.apache.felix.configadmin/1.6.0 = 10
-mvn\:org.apache.felix/org.apache.felix.fileinstall/3.2.8 = 11
-mvn\:org.ow2.asm/asm-all/4.1 = 12
-mvn\:org.apache.aries/org.apache.aries.util/1.1.0 = 20
-mvn\:org.apache.aries.proxy/org.apache.aries.proxy.api/1.0.0 = 20
-mvn\:org.apache.aries.proxy/org.apache.aries.proxy.impl/1.0.2 = 20
-mvn\:org.apache.aries.blueprint/org.apache.aries.blueprint.api/1.0.0 = 20
-mvn\:org.apache.aries.blueprint/org.apache.aries.blueprint.cm/1.0.3 = 20
-mvn\:org.apache.aries.blueprint/org.apache.aries.blueprint.core.compatibility/1.0.0 = 20
-mvn\:org.apache.aries.blueprint/org.apache.aries.blueprint.core/1.4.0 = 20
-mvn\:org.apache.karaf.deployer/org.apache.karaf.deployer.spring/3.0.1 = 24
-mvn\:org.apache.karaf.deployer/org.apache.karaf.deployer.blueprint/3.0.1 = 24
-mvn\:org.apache.karaf.deployer/org.apache.karaf.deployer.wrap/3.0.1 = 24
-mvn\:org.apache.karaf.region/org.apache.karaf.region.core/3.0.1 = 25
-mvn\:org.apache.karaf.features/org.apache.karaf.features.core/3.0.1 = 25
-mvn\:org.apache.karaf.deployer/org.apache.karaf.deployer.features/3.0.1 = 26
-mvn\:jline/jline/2.11 = 30
-mvn\:org.jledit/core/0.2.1 = 30
-mvn\:org.fusesource.jansi/jansi/1.11 = 30
-mvn\:org.ops4j.base/ops4j-base-util-property/1.4.0 = 30
-mvn\:org.ops4j.base/ops4j-base-util-xml/1.4.0 = 30
-mvn\:org.ops4j.base/ops4j-base-util-collections/1.4.0 = 30
-mvn\:org.ops4j.pax.url/pax-url-commons/1.6.0 = 30
-mvn\:org.ops4j.pax.swissbox/pax-swissbox-property/1.7.0 = 30
-mvn\:org.ops4j.base/ops4j-base-net/1.4.0 = 30
-mvn\:org.ops4j.base/ops4j-base-monitors/1.4.0 = 30
-mvn\:org.apache.karaf.features/org.apache.karaf.features.command/3.0.1 = 30
-mvn\:org.apache.karaf.shell/org.apache.karaf.shell.console/3.0.1 = 30
-mvn\:org.apache.karaf.jaas/org.apache.karaf.jaas.modules/3.0.1 = 30
-mvn\:org.apache.karaf.jaas/org.apache.karaf.jaas.config/3.0.1 = 30
-mvn\:org.apache.karaf.jaas/org.apache.karaf.jaas.boot/3.0.1 = 30
-mvn\:org.apache.sshd/sshd-core/0.9.0 = 30
-mvn\:org.apache.karaf.bundle/org.apache.karaf.bundle.command/3.0.1 = 30
-mvn\:org.apache.karaf.shell/org.apache.karaf.shell.table/3.0.1 = 30
-mvn\:org.apache.karaf.bundle/org.apache.karaf.bundle.core/3.0.1 = 30
-mvn\:org.apache.karaf.shell/org.apache.karaf.shell.help/3.0.1 = 30
-mvn\:org.apache.karaf.system/org.apache.karaf.system.core/3.0.1 = 30
-mvn\:org.apache.karaf.system/org.apache.karaf.system.command/3.0.1 = 30
-mvn\:org.apache.karaf.shell/org.apache.karaf.shell.commands/3.0.1 = 30
-mvn\:org.apache.aries.quiesce/org.apache.aries.quiesce.api/1.0.0 = 30
+++ /dev/null
-#
-# The properties defined in this file will be made available through system
-# properties at the very beginning of the Karaf's boot process.
-#
-
-# Use Equinox as default OSGi Framework Implementation
-karaf.framework=equinox
-
-# https://bugs.eclipse.org/bugs/show_bug.cgi?id=325578
-# Extend the framework to avoid the resources to be presented with
-# a URL of type bundleresource: but to be presented as file:
-osgi.hook.configurators.include=org.eclipse.virgo.kernel.equinox.extensions.hooks.ExtensionsHookConfigurator
-
-
-# Log level when the pax-logging service is not available
-# This level will only be used while the pax-logging service bundle
-# is not fully available.
-# To change log levels, please refer to the org.ops4j.pax.logging.cfg file
-# instead.
-org.ops4j.pax.logging.DefaultServiceLog.level = ERROR
-
-#
-# Name of this Karaf instance.
-#
-karaf.name = root
-
-#
-# Default repository where bundles will be loaded from before using
-# other Maven repositories. For the full Maven configuration, see
-# the org.ops4j.pax.url.mvn.cfg file.
-#
-karaf.default.repository = system
-
-#
-# Location of a shell script that will be run when starting a shell
-# session. This script can be used to create aliases and define
-# additional commands.
-#
-karaf.shell.init.script = ${karaf.etc}/shell.init.script
-
-#
-# Sets the maximum size of the shell command history. If not set,
-# defaults to 500 entries. Setting to 0 will disable history.
-#
-# karaf.shell.history.maxSize = 0
-
-#
-# Deletes the entire karaf.data directory at every start
-#
-karaf.clean.all = false
-
-#
-# Deletes the karaf.data/cache directory at every start
-#
-karaf.clean.cache = false
-
-#
-# Roles to use when logging into a local Karaf console.
-#
-# The syntax is the following:
-# [classname:]principal
-# where classname is the class name of the principal object
-# (defaults to org.apache.karaf.jaas.modules.RolePrincipal)
-# and principal is the name of the principal of that class
-# (defaults to instance).
-#
-karaf.local.roles = admin,manager,viewer
-
-#
-# Set this empty property to avoid errors when validating xml documents.
-#
-xml.catalog.files =
-
-#
-# Suppress the bell in the console when hitting backspace too many times
-# for example
-#
-jline.nobell = true
-
-#
-# ServiceMix specs options
-#
-org.apache.servicemix.specs.debug = false
-org.apache.servicemix.specs.timeout = 0
-
-#
-# Settings for the OSGi 4.3 Weaving
-# By default, we will not weave any classes. Change this setting to include classes
-# that you application needs to have woven.
-#
-org.apache.aries.proxy.weaving.enabled = none
-# Classes not to weave - Aries default + Xerces which is known to have issues.
-org.apache.aries.proxy.weaving.disabled = org.objectweb.asm.*,org.slf4j.*,org.apache.log4j.*,javax.*,org.apache.xerces.*
-
-#
-# By default, only Karaf shell commands are secured, but additional services can be
-# secured by expanding this filter
-#
-karaf.secured.services = (&(osgi.command.scope=*)(osgi.command.function=*))
-
-#
-# Security properties
-#
-# To enable OSGi security, uncomment the properties below,
-# install the framework-security feature and restart.
-#
-#java.security.policy=${karaf.etc}/all.policy
-#org.osgi.framework.security=osgi
-#org.osgi.framework.trust.repositories=${karaf.etc}/trustStore.ks
@Override
public void modifiedService(ServiceReference<BindingAwareBroker> reference, BindingAwareBroker service) {
- // TODO Auto-generated method stub
-
+ removedService(reference, service);
+ addingService(reference);
}
@Override
public void removedService(ServiceReference<BindingAwareBroker> reference, BindingAwareBroker service) {
- // TODO Auto-generated method stub
+ broker = context.getService(reference);
+ mdActivationPool.execute(new Runnable() {
+ @Override
+ public void run() {
+ onBrokerRemoved(broker, context);
+ }
+ });
}
};
protected abstract void onBrokerAvailable(BindingAwareBroker broker, BundleContext context);
protected void onBrokerRemoved(BindingAwareBroker broker, BundleContext context) {
-
+ stopImpl(context);
}
}
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-parser-impl</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-util</artifactId>
+ </dependency>
<dependency>
<groupId>xmlunit</groupId>
<artifactId>jsr305</artifactId>
<version>2.0.1</version>
</dependency>
+
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>3.0.1</version>
+ </dependency>
</dependencies>
</project>
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.common.actor;
+
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.BoundedMailbox;
+import akka.dispatch.MailboxType;
+import akka.dispatch.MessageQueue;
+import akka.dispatch.ProducesMessageQueue;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.opendaylight.controller.common.reporting.MetricsReporter;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<BoundedMailbox.MessageQueue> {
+
+ private MeteredMessageQueue queue;
+ private Integer capacity;
+ private FiniteDuration pushTimeOut;
+ private ActorPath actorPath;
+ private MetricsReporter reporter;
+
+ private final String QUEUE_SIZE = "queue-size";
+ private final Long DEFAULT_TIMEOUT = 10L;
+
+ public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
+ Preconditions.checkArgument( config.hasPath("mailbox-capacity"), "Missing configuration [mailbox-capacity]" );
+ this.capacity = config.getInt("mailbox-capacity");
+ Preconditions.checkArgument( this.capacity > 0, "mailbox-capacity must be > 0");
+
+ Long timeout = -1L;
+ if ( config.hasPath("mailbox-push-timeout-time") ){
+ timeout = config.getDuration("mailbox-push-timeout-time", TimeUnit.NANOSECONDS);
+ } else {
+ timeout = DEFAULT_TIMEOUT;
+ }
+ Preconditions.checkArgument( timeout > 0, "mailbox-push-timeout-time must be > 0");
+ this.pushTimeOut = new FiniteDuration(timeout, TimeUnit.NANOSECONDS);
+
+ reporter = MetricsReporter.getInstance();
+ }
+
+
+ @Override
+ public MessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
+ this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
+ monitorQueueSize(owner, this.queue);
+ return this.queue;
+ }
+
+ private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
+ if (owner.isEmpty()) {
+ return; //there's no actor to monitor
+ }
+ actorPath = owner.get().path();
+ MetricRegistry registry = reporter.getMetricsRegistry();
+
+ String actorName = registry.name(actorPath.toString(), QUEUE_SIZE);
+
+ if (registry.getMetrics().containsKey(actorName))
+ return; //already registered
+
+ reporter.getMetricsRegistry().register(actorName,
+ new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return monitoredQueue.size();
+ }
+ });
+ }
+
+
+ public static class MeteredMessageQueue extends BoundedMailbox.MessageQueue {
+
+ public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
+ super(capacity, pushTimeOut);
+ }
+ }
+
+}
+
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.common.reporting;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Maintains metrics registry that is provided to reporters.
+ * At the moment only one reporter exists {@code JmxReporter}.
+ * More reporters can be added.
+ * <p/>
+ * The consumers of this class will only be interested in {@code MetricsRegistry}
+ * where metrics for that consumer gets stored.
+ */
+public class MetricsReporter implements AutoCloseable{
+
+ private final MetricRegistry METRICS_REGISTRY = new MetricRegistry();
+ private final String DOMAIN = "org.opendaylight.controller";
+
+ public final JmxReporter jmxReporter = JmxReporter.forRegistry(METRICS_REGISTRY).inDomain(DOMAIN).build();
+
+ private static MetricsReporter inst = new MetricsReporter();
+
+ private MetricsReporter(){
+ jmxReporter.start();
+ }
+
+ public static MetricsReporter getInstance(){
+ return inst;
+ }
+
+ public MetricRegistry getMetricsRegistry(){
+ return METRICS_REGISTRY;
+ }
+
+ @Override
+ public void close() throws Exception {
+ jmxReporter.close();
+ }
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.remote.rpc.utils;
+package org.opendaylight.controller.xml.codec;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.remote.rpc.utils;
+package org.opendaylight.controller.xml.codec;
import org.opendaylight.yangtools.yang.common.QName;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.remote.rpc.utils;
+package org.opendaylight.controller.xml.codec;
import com.google.common.base.Function;
import com.google.common.base.Objects;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.remote.rpc.utils;
+package org.opendaylight.controller.xml.codec;
import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.remote.rpc.utils;
+package org.opendaylight.controller.xml.codec;
import com.google.common.base.Optional;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.common.actor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.DeadLetter;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import akka.testkit.JavaTestKit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class MeteredBoundedMailboxTest {
+
+ private static ActorSystem actorSystem;
+ private final ReentrantLock lock = new ReentrantLock();
+
+ @Before
+ public void setUp() throws Exception {
+ actorSystem = ActorSystem.create("testsystem");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (actorSystem != null)
+ actorSystem.shutdown();
+ }
+
+ @Test
+ public void test_WhenQueueIsFull_ShouldSendMsgToDeadLetter() throws InterruptedException {
+ final JavaTestKit mockReceiver = new JavaTestKit(actorSystem);
+ actorSystem.eventStream().subscribe(mockReceiver.getRef(), DeadLetter.class);
+
+
+ final FiniteDuration ONE_SEC = new FiniteDuration(1, TimeUnit.SECONDS);
+ String boundedMailBox = actorSystem.name() + ".bounded-mailbox";
+ ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(boundedMailBox),
+ "pingpongactor");
+
+ actorSystem.mailboxes().settings();
+ lock.lock();
+ //queue capacity = 10
+ //need to send 12 messages; 1 message is dequeued and actor waits on lock,
+ //2nd to 11th messages are put on the queue
+ //12th message is sent to dead letter.
+ for (int i=0;i<12;i++){
+ pingPongActor.tell("ping", mockReceiver.getRef());
+ }
+
+ mockReceiver.expectMsgClass(ONE_SEC, DeadLetter.class);
+
+ lock.unlock();
+
+ Object[] eleven = mockReceiver.receiveN(11, ONE_SEC);
+ }
+
+ /**
+ * For testing
+ */
+ public static class PingPongActor extends UntypedActor{
+
+ ReentrantLock lock;
+
+ private PingPongActor(ReentrantLock lock){
+ this.lock = lock;
+ }
+
+ public static Props props(final ReentrantLock lock){
+ return Props.create(new Creator<PingPongActor>(){
+ @Override
+ public PingPongActor create() throws Exception {
+ return new PingPongActor(lock);
+ }
+ });
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ lock.lock();
+ if ("ping".equals(message))
+ getSender().tell("pong", getSelf());
+ }
+ }
+}
\ No newline at end of file
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.remote.rpc.utils;
+package org.opendaylight.controller.xml.codec;
import com.google.common.collect.ImmutableList;
+testsystem {
+
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 10
+ mailbox-push-timeout-time = 100ms
+ }
+}
\ No newline at end of file
--- /dev/null
+testsystem {
+
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 10ms
+ }
+}
\ No newline at end of file
<version>1.1-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>3.0.1</version>
+ </dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>junit</groupId>
<Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
<Export-package></Export-package>
<Private-Package></Private-Package>
- <Import-Package>!*snappy;!org.jboss.*;*</Import-Package>
+ <Import-Package>!*snappy;!org.jboss.*;!com.jcraft.*;*</Import-Package>
<Embed-Dependency>
sal-clustering-commons;
sal-akka-raft;
+ *metrics*;
!sal*;
!*config-api*;
!*testkit*;
Logging.getLogger(getContext().system(), this);
- public AbstractUntypedActor(){
+ public AbstractUntypedActor() {
LOG.debug("Actor created {}", getSelf());
getContext().
system().
@Override public void onReceive(Object message) throws Exception {
LOG.debug("Received message {}", message.getClass().getSimpleName());
handleReceive(message);
- LOG.debug("Done handling message {}", message.getClass().getSimpleName());
+ LOG.debug("Done handling message {}",
+ message.getClass().getSimpleName());
}
protected abstract void handleReceive(Object message) throws Exception;
- protected void ignoreMessage(Object message){
+ protected void ignoreMessage(Object message) {
LOG.debug("Unhandled message {} ", message);
}
- protected void unknownMessage(Object message) throws Exception{
+ protected void unknownMessage(Object message) throws Exception {
+ LOG.debug("Received unhandled message {}", message);
unhandled(message);
}
}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.util.PropertyUtils;
-import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
*/
public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable {
- private static final Logger
- LOG = LoggerFactory.getLogger(DistributedDataStore.class);
-
- private static final String EXECUTOR_MAX_POOL_SIZE_PROP =
- "mdsal.dist-datastore-executor-pool.size";
- private static final int DEFAULT_EXECUTOR_MAX_POOL_SIZE = 10;
-
- private static final String EXECUTOR_MAX_QUEUE_SIZE_PROP =
- "mdsal.dist-datastore-executor-queue.size";
- private static final int DEFAULT_EXECUTOR_MAX_QUEUE_SIZE = 5000;
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
private final ActorContext actorContext;
private SchemaContext schemaContext;
- /**
- * Executor used to run FutureTask's
- *
- * This is typically used when we need to make a request to an actor and
- * wait for it's response and the consumer needs to be provided a Future.
- */
- private final ListeningExecutorService executor =
- MoreExecutors.listeningDecorator(
- SpecialExecutors.newBlockingBoundedFastThreadPool(
- PropertyUtils.getIntSystemProperty(
- EXECUTOR_MAX_POOL_SIZE_PROP,
- DEFAULT_EXECUTOR_MAX_POOL_SIZE),
- PropertyUtils.getIntSystemProperty(
- EXECUTOR_MAX_QUEUE_SIZE_PROP,
- DEFAULT_EXECUTOR_MAX_QUEUE_SIZE), "DistDataStore"));
-
- public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) {
+ public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
+ Configuration configuration, InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
LOG.info("Creating ShardManager : {}", shardManagerId);
this.actorContext = new ActorContext(actorSystem, actorSystem
- .actorOf(ShardManager.props(type, cluster, configuration),
+ .actorOf(ShardManager.props(type, cluster, configuration, dataStoreProperties),
shardManagerId ), cluster, configuration);
}
}
+ @SuppressWarnings("unchecked")
@Override
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
+ public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ ListenerRegistration<L> registerChangeListener(
YangInstanceIdentifier path, L listener,
AsyncDataBroker.DataChangeScope scope) {
Preconditions.checkNotNull(path, "path should not be null");
Preconditions.checkNotNull(listener, "listener should not be null");
-
LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
Object result = actorContext.executeLocalShardOperation(shardName,
- new RegisterChangeListener(path, dataChangeListenerActor.path(),
- scope),
- ActorContext.ASK_DURATION
- );
+ new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+ ActorContext.ASK_DURATION);
if (result != null) {
RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
LOG.debug(
"No local shard for shardName {} was found so returning a noop registration",
shardName);
+
return new NoOpDataChangeListenerRegistration(listener);
}
-
-
-
-
@Override
public DOMStoreTransactionChain createTransactionChain() {
- return new TransactionChainProxy(actorContext, executor, schemaContext);
+ return new TransactionChainProxy(actorContext, schemaContext);
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
- executor, schemaContext);
+ schemaContext);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
- executor, schemaContext);
+ schemaContext);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
- executor, schemaContext);
+ schemaContext);
}
@Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSystem;
+
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
public class DistributedDataStoreFactory {
- public static DistributedDataStore createInstance(String name, SchemaService schemaService){
+ public static DistributedDataStore createInstance(String name, SchemaService schemaService,
+ InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+
ActorSystem actorSystem = ActorSystemFactory.getInstance();
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
final DistributedDataStore dataStore =
- new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),config );
- ShardStrategyFactory.setConfiguration(config);
- schemaService
- .registerSchemaContextListener(dataStore);
+ new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
+ config, dataStoreProperties );
+ ShardStrategyFactory.setConfiguration(config);
+ schemaService.registerSchemaContextListener(dataStore);
return dataStore;
-
}
}
import akka.serialization.Serialization;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
- private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses) {
+ private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
+ InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
this.name = name;
LOG.info("Shard created : {} persistent : {}", name, persistent);
- store = InMemoryDOMDataStoreFactory.create(name.toString(), null);
+ store = InMemoryDOMDataStoreFactory.create(name.toString(), null, dataStoreProperties);
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
}
- private static Map<String, String> mapPeerAddresses(Map<ShardIdentifier, String> peerAddresses){
- Map<String , String> map = new HashMap<>();
+ private static Map<String, String> mapPeerAddresses(
+ Map<ShardIdentifier, String> peerAddresses) {
+ Map<String, String> map = new HashMap<>();
- for(Map.Entry<ShardIdentifier, String> entry : peerAddresses.entrySet()){
+ for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
+ .entrySet()) {
map.put(entry.getKey().toString(), entry.getValue());
}
public static Props props(final ShardIdentifier name,
- final Map<ShardIdentifier, String> peerAddresses) {
+ final Map<ShardIdentifier, String> peerAddresses,
+ final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
Preconditions.checkNotNull(name, "name should not be null");
- Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
+ Preconditions
+ .checkNotNull(peerAddresses, "peerAddresses should not be null");
return Props.create(new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new Shard(name, peerAddresses);
+ return new Shard(name, peerAddresses, dataStoreProperties);
}
});
}
} else if (message instanceof PeerAddressResolved) {
PeerAddressResolved resolved = (PeerAddressResolved) message;
- setPeerAddress(resolved.getPeerId().toString(), resolved.getPeerAddress());
+ setPeerAddress(resolved.getPeerId().toString(),
+ resolved.getPeerAddress());
} else {
super.onReceiveCommand(message);
}
}
private ActorRef createTypedTransactionActor(
- CreateTransaction createTransaction, ShardTransactionIdentifier transactionId) {
+ CreateTransaction createTransaction,
+ ShardTransactionIdentifier transactionId) {
if (createTransaction.getTransactionType()
== TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
.props(store.newWriteOnlyTransaction(), getSelf(),
schemaContext), transactionId.toString());
} else {
- // FIXME: This does not seem right
throw new IllegalArgumentException(
- "CreateTransaction message has unidentified transaction type="
+ "Shard="+name + ":CreateTransaction message has unidentified transaction type="
+ createTransaction.getTransactionType());
}
}
private void createTransaction(CreateTransaction createTransaction) {
- ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder().remoteTransactionId(createTransaction.getTransactionId()).build();
+ ShardTransactionIdentifier transactionId =
+ ShardTransactionIdentifier.builder()
+ .remoteTransactionId(createTransaction.getTransactionId())
+ .build();
LOG.debug("Creating transaction : {} ", transactionId);
ActorRef transactionActor =
createTypedTransactionActor(createTransaction, transactionId);
getSender()
.tell(new CreateTransactionReply(
- Serialization.serializedActorPath(transactionActor),
- createTransaction.getTransactionId()).toSerializable(),
+ Serialization.serializedActorPath(transactionActor),
+ createTransaction.getTransactionId()).toSerializable(),
getSelf());
}
final ListenableFuture<Void> future = cohort.commit();
final ActorRef self = getSelf();
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- future.get();
- sender
- .tell(new CommitTransactionReply().toSerializable(),
- self);
- shardMBean.incrementCommittedTransactionCount();
- shardMBean.setLastCommittedTransactionTime(new Date());
- } catch (InterruptedException | ExecutionException e) {
- shardMBean.incrementFailedTransactionsCount();
- sender.tell(new akka.actor.Status.Failure(e),self);
- }
+
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ public void onSuccess(Void v) {
+ sender.tell(new CommitTransactionReply().toSerializable(),self);
+ shardMBean.incrementCommittedTransactionCount();
+ shardMBean.setLastCommittedTransactionTime(new Date());
}
- }, getContext().dispatcher());
+
+ public void onFailure(Throwable t) {
+ LOG.error(t, "An exception happened during commit");
+ shardMBean.incrementFailedTransactionsCount();
+ sender.tell(new akka.actor.Status.Failure(t), self);
+ }
+ });
+
}
private void handleForwardedCommit(ForwardedCommitTransaction message) {
LOG.debug(
"registerDataChangeListener sending reply, listenerRegistrationPath = {} "
- , listenerRegistration.path().toString());
+ , listenerRegistration.path().toString());
getSender()
.tell(new RegisterChangeListenerReply(listenerRegistration.path()),
// Update stats
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
- if(lastLogEntry != null){
+ if (lastLogEntry != null) {
shardMBean.setLastLogIndex(lastLogEntry.getIndex());
shardMBean.setLastLogTerm(lastLogEntry.getTerm());
}
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+
import scala.concurrent.duration.Duration;
import java.util.ArrayList;
private ShardManagerInfoMBean mBean;
+ private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
+
/**
* @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
* configuration or operational
*/
- private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+ private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+ InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
this.type = Preconditions.checkNotNull(type, "type should not be null");
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
+ this.dataStoreProperties = dataStoreProperties;
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
public static Props props(final String type,
final ClusterWrapper cluster,
- final Configuration configuration) {
+ final Configuration configuration,
+ final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
@Override
public ShardManager create() throws Exception {
- return new ShardManager(type, cluster, configuration);
+ return new ShardManager(type, cluster, configuration, dataStoreProperties);
}
});
}
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
ActorRef actor = getContext()
- .actorOf(Shard.props(shardId, peerAddresses),
+ .actorOf(Shard.props(shardId, peerAddresses, dataStoreProperties),
shardId.toString());
localShardActorNames.add(shardId.toString());
localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
@Override
public SupervisorStrategy supervisorStrategy() {
+
return new OneForOneStrategy(10, Duration.create("1 minute"),
new Function<Throwable, SupervisorStrategy.Directive>() {
@Override
public SupervisorStrategy.Directive apply(Throwable t) {
+ LOG.warning("Supervisor Strategy of resume applied {}",t);
return SupervisorStrategy.resume();
}
}
} else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
} else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- deleteData(transaction,DeleteData.fromSerizalizable(message));
+ deleteData(transaction,DeleteData.fromSerializable(message));
} else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
readyTransaction(transaction,new ReadyTransaction());
} else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
} else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
} else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- deleteData(transaction,DeleteData.fromSerizalizable(message));
+ deleteData(transaction,DeleteData.fromSerializable(message));
} else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
readyTransaction(transaction,new ReadyTransaction());
}else {
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import java.util.concurrent.ExecutionException;
-
public class ThreePhaseCommitCohort extends AbstractUntypedActor {
private final DOMStoreThreePhaseCommitCohort cohort;
private final ActorRef shardActor;
@Override
public void handleReceive(Object message) throws Exception {
- if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+ if (message.getClass()
+ .equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
canCommit(new CanCommitTransaction());
- } else if (message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
+ } else if (message.getClass()
+ .equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
preCommit(new PreCommitTransaction());
- } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+ } else if (message.getClass()
+ .equals(CommitTransaction.SERIALIZABLE_CLASS)) {
commit(new CommitTransaction());
- } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+ } else if (message.getClass()
+ .equals(AbortTransaction.SERIALIZABLE_CLASS)) {
abort(new AbortTransaction());
} else {
unknownMessage(message);
final ActorRef sender = getSender();
final ActorRef self = getSelf();
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- future.get();
- sender.tell(new AbortTransactionReply().toSerializable(), self);
- } catch (InterruptedException | ExecutionException e) {
- log.error(e, "An exception happened when aborting");
- }
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ public void onSuccess(Void v) {
+ sender
+ .tell(new AbortTransactionReply().toSerializable(),
+ self);
+ }
+
+ public void onFailure(Throwable t) {
+ LOG.error(t, "An exception happened during abort");
+ sender
+ .tell(new akka.actor.Status.Failure(t), getSelf());
}
- }, getContext().dispatcher());
+ });
}
private void commit(CommitTransaction message) {
final ListenableFuture<Void> future = cohort.preCommit();
final ActorRef sender = getSender();
final ActorRef self = getSelf();
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ public void onSuccess(Void v) {
+ sender
+ .tell(new PreCommitTransactionReply().toSerializable(),
+ self);
+ }
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- future.get();
- sender.tell(new PreCommitTransactionReply().toSerializable(), self);
- } catch (InterruptedException | ExecutionException e) {
- log.error(e, "An exception happened when preCommitting");
- }
+ public void onFailure(Throwable t) {
+ LOG.error(t, "An exception happened during pre-commit");
+ sender
+ .tell(new akka.actor.Status.Failure(t), getSelf());
}
- }, getContext().dispatcher());
+ });
}
final ListenableFuture<Boolean> future = cohort.canCommit();
final ActorRef sender = getSender();
final ActorRef self = getSelf();
+ Futures.addCallback(future, new FutureCallback<Boolean>() {
+ public void onSuccess(Boolean canCommit) {
+ sender.tell(new CanCommitTransactionReply(canCommit)
+ .toSerializable(), self);
+ }
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- Boolean canCommit = future.get();
- sender.tell(new CanCommitTransactionReply(canCommit).toSerializable(), self);
- } catch (InterruptedException | ExecutionException e) {
- log.error(e, "An exception happened when checking canCommit");
- }
+ public void onFailure(Throwable t) {
+ LOG.error(t, "An exception happened during canCommit");
+ sender
+ .tell(new akka.actor.Status.Failure(t), getSelf());
}
- }, getContext().dispatcher());
+ });
+
}
}
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.Callable;
/**
* ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
*/
-public class ThreePhaseCommitCohortProxy implements
- DOMStoreThreePhaseCommitCohort{
+public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
- private static final Logger
- LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
private final ActorContext actorContext;
private final List<ActorPath> cohortPaths;
- private final ListeningExecutorService executor;
private final String transactionId;
-
- public ThreePhaseCommitCohortProxy(ActorContext actorContext,
- List<ActorPath> cohortPaths,
- String transactionId,
- ListeningExecutorService executor) {
-
+ public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths,
+ String transactionId) {
this.actorContext = actorContext;
this.cohortPaths = cohortPaths;
this.transactionId = transactionId;
- this.executor = executor;
}
- @Override public ListenableFuture<Boolean> canCommit() {
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
LOG.debug("txn {} canCommit", transactionId);
- Callable<Boolean> call = new Callable<Boolean>() {
+ Future<Iterable<Object>> combinedFuture =
+ invokeCohorts(new CanCommitTransaction().toSerializable());
+
+ final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+
+ combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
@Override
- public Boolean call() throws Exception {
- for(ActorPath actorPath : cohortPaths){
-
- Object message = new CanCommitTransaction().toSerializable();
- LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
-
- ActorSelection cohort = actorContext.actorSelection(actorPath);
-
- try {
- Object response =
- actorContext.executeRemoteOperation(cohort,
- message,
- ActorContext.ASK_DURATION);
-
- if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
- CanCommitTransactionReply reply =
- CanCommitTransactionReply.fromSerializable(response);
- if (!reply.getCanCommit()) {
- return false;
- }
+ public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
+ if(failure != null) {
+ returnFuture.setException(failure);
+ return;
+ }
+
+ boolean result = true;
+ for(Object response: responses) {
+ if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+ CanCommitTransactionReply reply =
+ CanCommitTransactionReply.fromSerializable(response);
+ if (!reply.getCanCommit()) {
+ result = false;
+ break;
}
- } catch(RuntimeException e){
- // FIXME : Need to properly handle this
- LOG.error("Unexpected Exception", e);
- return false;
+ } else {
+ LOG.error("Unexpected response type {}", response.getClass());
+ returnFuture.setException(new IllegalArgumentException(
+ String.format("Unexpected response type {}", response.getClass())));
+ return;
}
}
- return true;
+ returnFuture.set(Boolean.valueOf(result));
}
- };
+ }, actorContext.getActorSystem().dispatcher());
+
+ return returnFuture;
+ }
+
+ private Future<Iterable<Object>> invokeCohorts(Object message) {
+ List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
+ for(ActorPath actorPath : cohortPaths) {
+
+ LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
- return executor.submit(call);
+ ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+ futureList.add(actorContext.executeRemoteOperationAsync(cohort, message,
+ ActorContext.ASK_DURATION));
+ }
+
+ return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
}
- @Override public ListenableFuture<Void> preCommit() {
+ @Override
+ public ListenableFuture<Void> preCommit() {
LOG.debug("txn {} preCommit", transactionId);
- return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
+ return voidOperation(new PreCommitTransaction().toSerializable(),
+ PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
}
- @Override public ListenableFuture<Void> abort() {
+ @Override
+ public ListenableFuture<Void> abort() {
LOG.debug("txn {} abort", transactionId);
- return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
+
+ // Note - we pass false for propagateException. In the front-end data broker, this method
+ // is called when one of the 3 phases fails with an exception. We'd rather have that
+ // original exception propagated to the client. If our abort fails and we propagate the
+ // exception then that exception will supersede and suppress the original exception. But
+ // it's the original exception that is the root cause and of more interest to the client.
+
+ return voidOperation(new AbortTransaction().toSerializable(),
+ AbortTransactionReply.SERIALIZABLE_CLASS, false);
}
- @Override public ListenableFuture<Void> commit() {
+ @Override
+ public ListenableFuture<Void> commit() {
LOG.debug("txn {} commit", transactionId);
- return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
+ return voidOperation(new CommitTransaction().toSerializable(),
+ CommitTransactionReply.SERIALIZABLE_CLASS, true);
}
- private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
- Callable<Void> call = new Callable<Void>() {
-
- @Override public Void call() throws Exception {
- for(ActorPath actorPath : cohortPaths){
- ActorSelection cohort = actorContext.actorSelection(actorPath);
-
- LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
-
- try {
- Object response =
- actorContext.executeRemoteOperation(cohort,
- message,
- ActorContext.ASK_DURATION);
-
- if (response != null && !response.getClass()
- .equals(expectedResponseClass)) {
- throw new RuntimeException(
- String.format(
- "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s",
- expectedResponseClass.toString(),
- response.getClass().toString())
- );
+ private ListenableFuture<Void> voidOperation(final Object message,
+ final Class<?> expectedResponseClass, final boolean propagateException) {
+
+ Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+
+ final SettableFuture<Void> returnFuture = SettableFuture.create();
+
+ combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
+
+ Throwable exceptionToPropagate = failure;
+ if(exceptionToPropagate == null) {
+ for(Object response: responses) {
+ if(!response.getClass().equals(expectedResponseClass)) {
+ exceptionToPropagate = new IllegalArgumentException(
+ String.format("Unexpected response type {}",
+ response.getClass()));
+ break;
}
- } catch(TimeoutException e){
- LOG.error(String.format("A timeout occurred when processing operation : %s", message));
}
}
- return null;
+
+ if(exceptionToPropagate != null) {
+ if(propagateException) {
+ // We don't log the exception here to avoid redundant logging since we're
+ // propagating to the caller in MD-SAL core who will log it.
+ returnFuture.setException(exceptionToPropagate);
+ } else {
+ // Since the caller doesn't want us to propagate the exception we'll also
+ // not log it normally. But it's usually not good to totally silence
+ // exceptions so we'll log it to debug level.
+ LOG.debug(String.format("%s failed", message.getClass().getSimpleName()),
+ exceptionToPropagate);
+ returnFuture.set(null);
+ }
+ } else {
+ returnFuture.set(null);
+ }
}
- };
+ }, actorContext.getActorSystem().dispatcher());
- return executor.submit(call);
+ return returnFuture;
}
public List<ActorPath> getCohortPaths() {
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
/**
* TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
*/
public class TransactionChainProxy implements DOMStoreTransactionChain{
private final ActorContext actorContext;
- private final ListeningExecutorService transactionExecutor;
private final SchemaContext schemaContext;
- public TransactionChainProxy(ActorContext actorContext, ListeningExecutorService transactionExecutor,
- SchemaContext schemaContext) {
+ public TransactionChainProxy(ActorContext actorContext, SchemaContext schemaContext) {
this.actorContext = actorContext;
- this.transactionExecutor = transactionExecutor;
this.schemaContext = schemaContext;
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, schemaContext);
+ TransactionProxy.TransactionType.READ_ONLY, schemaContext);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.WRITE_ONLY, transactionExecutor, schemaContext);
+ TransactionProxy.TransactionType.WRITE_ONLY, schemaContext);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_WRITE, transactionExecutor, schemaContext);
+ TransactionProxy.TransactionType.READ_WRITE, schemaContext);
}
@Override
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
+import akka.dispatch.OnComplete;
+
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
-import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import com.google.common.util.concurrent.SettableFuture;
+
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
/**
private final ActorContext actorContext;
private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
private final TransactionIdentifier identifier;
- private final ListeningExecutorService executor;
private final SchemaContext schemaContext;
+ private boolean inReadyState;
- public TransactionProxy(
- ActorContext actorContext,
- TransactionType transactionType,
- ListeningExecutorService executor,
- SchemaContext schemaContext
- ) {
+ public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
+ SchemaContext schemaContext) {
this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
- this.executor = Preconditions.checkNotNull(executor, "executor should not be null");
this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
String memberName = actorContext.getCurrentMemberName();
if(memberName == null){
memberName = "UNKNOWN-MEMBER";
}
- this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build();
+
+ this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
+ counter.getAndIncrement()).build();
LOG.debug("Created txn {}", identifier);
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
final YangInstanceIdentifier path) {
+ Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
+ "Read operation on write-only transaction is not allowed");
+
LOG.debug("txn {} read {}", identifier, path);
createTransactionIfMissing(actorContext, path);
return transactionContext(path).readData(path);
}
- @Override public CheckedFuture<Boolean, ReadFailedException> exists(
- YangInstanceIdentifier path) {
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
+
+ Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
+ "Exists operation on write-only transaction is not allowed");
+
LOG.debug("txn {} exists {}", identifier, path);
createTransactionIfMissing(actorContext, path);
return transactionContext(path).dataExists(path);
}
+ private void checkModificationState() {
+ Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
+ "Modification operation on read-only transaction is not allowed");
+ Preconditions.checkState(!inReadyState,
+ "Transaction is sealed - further modifications are allowed");
+ }
+
@Override
public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ checkModificationState();
+
LOG.debug("txn {} write {}", identifier, path);
createTransactionIfMissing(actorContext, path);
@Override
public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ checkModificationState();
+
LOG.debug("txn {} merge {}", identifier, path);
createTransactionIfMissing(actorContext, path);
@Override
public void delete(YangInstanceIdentifier path) {
+ checkModificationState();
+
LOG.debug("txn {} delete {}", identifier, path);
createTransactionIfMissing(actorContext, path);
@Override
public DOMStoreThreePhaseCommitCohort ready() {
+
+ checkModificationState();
+
+ inReadyState = true;
+
List<ActorPath> cohortPaths = new ArrayList<>();
- LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size());
+ LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier,
+ remoteTransactionPaths.size());
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
- LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName());
+ LOG.debug("txn {} Readying transaction for shard {}", identifier,
+ transactionContext.getShardName());
Object result = transactionContext.readyTransaction();
if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
- ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result);
- String resolvedCohortPath = transactionContext
- .getResolvedCohortPath(reply.getCohortPath().toString());
+ ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
+ actorContext.getActorSystem(),result);
+ String resolvedCohortPath = transactionContext.getResolvedCohortPath(
+ reply.getCohortPath().toString());
cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
+ } else {
+ LOG.error("Was expecting {} but got {}", ReadyTransactionReply.SERIALIZABLE_CLASS,
+ result.getClass());
}
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor);
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString());
}
@Override
Object response = actorContext.executeShardOperation(shardName,
new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
ActorContext.ASK_DURATION);
- if (response.getClass()
- .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+ if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
CreateTransactionReply reply =
CreateTransactionReply.fromSerializable(response);
transactionActor);
remoteTransactionPaths.put(shardName, transactionContext);
+ } else {
+ LOG.error("Was expecting {} but got {}", CreateTransactionReply.SERIALIZABLE_CLASS,
+ response.getClass());
}
- } catch(TimeoutException | PrimaryNotFoundException e){
+ } catch(Exception e){
LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
- remoteTransactionPaths.put(shardName,
- new NoOpTransactionContext(shardName));
+ remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
}
}
this.actor = actor;
}
- @Override public String getShardName() {
+ @Override
+ public String getShardName() {
return shardName;
}
return actor;
}
- @Override public String getResolvedCohortPath(String cohortPath) {
+ @Override
+ public String getResolvedCohortPath(String cohortPath) {
return actorContext.resolvePath(actorPath, cohortPath);
}
- @Override public void closeTransaction() {
- getActor().tell(
- new CloseTransaction().toSerializable(), null);
+ @Override
+ public void closeTransaction() {
+ actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
}
- @Override public Object readyTransaction() {
+ @Override
+ public Object readyTransaction() {
return actorContext.executeRemoteOperation(getActor(),
- new ReadyTransaction().toSerializable(),
- ActorContext.ASK_DURATION
- );
-
+ new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
}
- @Override public void deleteData(YangInstanceIdentifier path) {
- getActor().tell(new DeleteData(path).toSerializable(), null);
+ @Override
+ public void deleteData(YangInstanceIdentifier path) {
+ actorContext.sendRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() );
}
- @Override public void mergeData(YangInstanceIdentifier path,
- NormalizedNode<?, ?> data) {
- getActor()
- .tell(new MergeData(path, data, schemaContext).toSerializable(),
- null);
+ @Override
+ public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ actorContext.sendRemoteOperationAsync(getActor(),
+ new MergeData(path, data, schemaContext).toSerializable());
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
final YangInstanceIdentifier path) {
- Callable<Optional<NormalizedNode<?, ?>>> call =
- new Callable<Optional<NormalizedNode<?, ?>>>() {
-
- @Override public Optional<NormalizedNode<?, ?>> call()
- throws Exception {
- Object response = actorContext
- .executeRemoteOperation(getActor(),
- new ReadData(path).toSerializable(),
- ActorContext.ASK_DURATION);
- if (response.getClass()
- .equals(ReadDataReply.SERIALIZABLE_CLASS)) {
- ReadDataReply reply = ReadDataReply
- .fromSerializable(schemaContext, path,
- response);
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
+
+ OnComplete<Object> onComplete = new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) throws Throwable {
+ if(failure != null) {
+ returnFuture.setException(new ReadFailedException(
+ "Error reading data for path " + path, failure));
+ } else {
+ if (response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+ ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
+ path, response);
if (reply.getNormalizedNode() == null) {
- return Optional.absent();
+ returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
+ } else {
+ returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
+ reply.getNormalizedNode()));
}
- return Optional.<NormalizedNode<?, ?>>of(
- reply.getNormalizedNode());
+ } else {
+ returnFuture.setException(new ReadFailedException(
+ "Invalid response reading data for path " + path));
}
-
- throw new ReadFailedException("Read Failed " + path);
}
- };
+ }
+ };
- return MappingCheckedFuture
- .create(executor.submit(call), ReadFailedException.MAPPER);
- }
+ Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+ new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
+ future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
- @Override public void writeData(YangInstanceIdentifier path,
- NormalizedNode<?, ?> data) {
- getActor()
- .tell(new WriteData(path, data, schemaContext).toSerializable(),
- null);
+ return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
}
- @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
- final YangInstanceIdentifier path) {
-
- Callable<Boolean> call = new Callable<Boolean>() {
-
- @Override public Boolean call() throws Exception {
- Object o = actorContext.executeRemoteOperation(getActor(),
- new DataExists(path).toSerializable(),
- ActorContext.ASK_DURATION
- );
-
+ @Override
+ public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ actorContext.sendRemoteOperationAsync(getActor(),
+ new WriteData(path, data, schemaContext).toSerializable());
+ }
- if (DataExistsReply.SERIALIZABLE_CLASS
- .equals(o.getClass())) {
- return DataExistsReply.fromSerializable(o).exists();
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> dataExists(
+ final YangInstanceIdentifier path) {
+
+ final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+
+ OnComplete<Object> onComplete = new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) throws Throwable {
+ if(failure != null) {
+ returnFuture.setException(new ReadFailedException(
+ "Error checking exists for path " + path, failure));
+ } else {
+ if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
+ returnFuture.set(Boolean.valueOf(DataExistsReply.
+ fromSerializable(response).exists()));
+ } else {
+ returnFuture.setException(new ReadFailedException(
+ "Invalid response checking exists for path " + path));
+ }
}
-
- throw new ReadFailedException("Exists Failed " + path);
}
};
- return MappingCheckedFuture
- .create(executor.submit(call), ReadFailedException.MAPPER);
+
+ Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+ new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
+ future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+
+ return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
}
}
LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
private final String shardName;
+ private final Exception failure;
private ActorRef cohort;
- public NoOpTransactionContext(String shardName){
+ public NoOpTransactionContext(String shardName, Exception failure){
this.shardName = shardName;
+ this.failure = failure;
}
- @Override public String getShardName() {
+
+ @Override
+ public String getShardName() {
return shardName;
}
- @Override public String getResolvedCohortPath(String cohortPath) {
+ @Override
+ public String getResolvedCohortPath(String cohortPath) {
return cohort.path().toString();
}
- @Override public void closeTransaction() {
+ @Override
+ public void closeTransaction() {
LOG.warn("txn {} closeTransaction called", identifier);
}
return new ReadyTransactionReply(cohort.path()).toSerializable();
}
- @Override public void deleteData(YangInstanceIdentifier path) {
+ @Override
+ public void deleteData(YangInstanceIdentifier path) {
LOG.warn("txt {} deleteData called path = {}", identifier, path);
}
- @Override public void mergeData(YangInstanceIdentifier path,
+ @Override
+ public void mergeData(YangInstanceIdentifier path,
NormalizedNode<?, ?> data) {
LOG.warn("txn {} mergeData called path = {}", identifier, path);
}
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
LOG.warn("txn {} readData called path = {}", identifier, path);
- return Futures.immediateCheckedFuture(
- Optional.<NormalizedNode<?, ?>>absent());
+ return Futures.immediateFailedCheckedFuture(new ReadFailedException(
+ "Error reading data for path " + path, failure));
}
@Override public void writeData(YangInstanceIdentifier path,
@Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
YangInstanceIdentifier path) {
LOG.warn("txn {} dataExists called path = {}", identifier, path);
-
- // Returning false instead of an exception to keep this aligned with
- // read
- return Futures.immediateCheckedFuture(false);
+ return Futures.immediateFailedCheckedFuture(new ReadFailedException(
+ "Error checking exists for path " + path, failure));
}
}
.setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(path)).build();
}
- public static DeleteData fromSerizalizable(Object serializable){
+ public static DeleteData fromSerializable(Object serializable){
ShardTransactionMessages.DeleteData o = (ShardTransactionMessages.DeleteData) serializable;
return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
}
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.util.Timeout;
+
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
- private SchemaContext schemaContext = null;
-
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper,
Configuration configuration) {
}
}
+ /**
+ * Execute an operation on a remote actor asynchronously.
+ *
+ * @param actor the ActorSelection
+ * @param message the message to send
+ * @param duration the maximum amount of time to send he message
+ * @return a Future containing the eventual result
+ */
+ public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
+ FiniteDuration duration) {
+
+ LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+
+ return ask(actor, message, new Timeout(duration));
+ }
+
+ /**
+ * Sends an operation to be executed by a remote actor asynchronously without waiting for a
+ * reply (essentially set and forget).
+ *
+ * @param actor the ActorSelection
+ * @param message the message to send
+ */
+ public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
+ actor.tell(message, ActorRef.noSender());
+ }
+
/**
* Execute an operation on the primary for a given shard
* <p>
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
public class DistributedConfigDataStoreProviderModule extends
org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
@Override
public java.lang.AutoCloseable createInstance() {
- return DistributedDataStoreFactory
- .createInstance("config", getConfigSchemaServiceDependency());
+ return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
+ InMemoryDOMDataStoreConfigProperties.create(getMaxShardDataChangeExecutorPoolSize(),
+ getMaxShardDataChangeExecutorQueueSize(),
+ getMaxShardDataChangeListenerQueueSize()));
}
}
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
public class DistributedOperationalDataStoreProviderModule extends
org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
@Override
public java.lang.AutoCloseable createInstance() {
- return DistributedDataStoreFactory
- .createInstance("operational", getOperationalSchemaServiceDependency());
+ return DistributedDataStoreFactory.createInstance("operational",
+ getOperationalSchemaServiceDependency(),
+ InMemoryDOMDataStoreConfigProperties.create(getMaxShardDataChangeExecutorPoolSize(),
+ getMaxShardDataChangeExecutorQueueSize(),
+ getMaxShardDataChangeListenerQueueSize()));
}
}
case distributed-config-datastore-provider {
when "/config:modules/config:module/config:type = 'distributed-config-datastore-provider'";
container config-schema-service {
- uses config:service-ref {
- refine type {
- mandatory false;
- config:required-identity sal:schema-service;
- }
- }
- }
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity sal:schema-service;
+ }
+ }
+ }
+
+ leaf max-shard-data-change-executor-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for each shard's data store data change notification executor.";
+ }
+
+ leaf max-shard-data-change-executor-pool-size {
+ default 20;
+ type uint16;
+ description "The maximum thread pool size for each shard's data store data change notification executor.";
+ }
+
+ leaf max-shard-data-change-listener-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for each shard's data store data change listeners.";
+ }
}
}
// Augments the 'configuration' choice node under modules/module.
- augment "/config:modules/config:module/config:configuration" {
- case distributed-operational-datastore-provider {
- when "/config:modules/config:module/config:type = 'distributed-operational-datastore-provider'";
+ augment "/config:modules/config:module/config:configuration" {
+ case distributed-operational-datastore-provider {
+ when "/config:modules/config:module/config:type = 'distributed-operational-datastore-provider'";
container operational-schema-service {
- uses config:service-ref {
- refine type {
- mandatory false;
- config:required-identity sal:schema-service;
- }
- }
- }
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity sal:schema-service;
+ }
+ }
+ }
+
+ leaf max-shard-data-change-executor-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for each shard's data store data change notification executor.";
+ }
+
+ leaf max-shard-data-change-executor-pool-size {
+ default 20;
+ type uint16;
+ description "The maximum thread pool size for each shard's data store data change notification executor.";
+ }
+
+ leaf max-shard-data-change-listener-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for each shard's data store data change listeners.";
+ }
}
}
}
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
final ActorRef shard = getSystem().actorOf(props);
new Within(duration("5 seconds")) {
protected void run() {
try {
final DistributedDataStore distributedDataStore =
- new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration);
+ new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration, null);
distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
try {
final DistributedDataStore distributedDataStore =
new DistributedDataStore(getSystem(), "config",
- new MockClusterWrapper(), configuration);
+ new MockClusterWrapper(), configuration, null);
distributedDataStore.onGlobalContextUpdated(
SchemaContextHelper.full());
ActorSystem actorSystem = mock(ActorSystem.class);
new DistributedDataStore(actorSystem, "config",
- mock(ClusterWrapper.class), mock(Configuration.class));
+ mock(ClusterWrapper.class), mock(Configuration.class), null);
verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
}
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration());
+ new MockConfiguration(), null);
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration());
+ new MockConfiguration(), null);
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration());
+ new MockConfiguration(), null);
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", mockClusterWrapper,
- new MockConfiguration());
+ new MockConfiguration(), null);
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration());
+ new MockConfiguration(), null);
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
new JavaTestKit(system) {{
final Props props = ShardManager
.props("config", new MockClusterWrapper(),
- new MockConfiguration());
+ new MockConfiguration(), null);
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
final ActorRef subject =
getSystem().actorOf(props, "testCreateTransactionChain");
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
final ActorRef subject =
getSystem().actorOf(props, "testRegisterChangeListener");
ShardIdentifier.builder().memberName("member-1")
.shardName("inventory").type("config").build();
- final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
final ActorRef subject =
getSystem().actorOf(props, "testCreateTransaction");
.shardName("inventory").type("config").build();
peerAddresses.put(identifier, null);
- final Props props = Shard.props(identifier, peerAddresses);
+ final Props props = Shard.props(identifier, peerAddresses, null);
final ActorRef subject =
getSystem().actorOf(props, "testPeerAddressResolved");
/**
* Covers negative test cases
+ *
* @author Basheeruddin Ahmed <syedbahm@cisco.com>
*/
public class ShardTransactionFailureTest extends AbstractActorTest {
private static final ShardIdentifier SHARD_IDENTIFIER =
ShardIdentifier.builder().memberName("member-1")
- .shardName("inventory").type("config").build();
+ .shardName("inventory").type("operational").build();
static {
store.onGlobalContextUpdated(testSchemaContext);
throws Throwable {
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadOnlyTransaction(), shard,
TestModel.createTestContext());
throws Throwable {
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
throws Throwable {
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
final Props props =
ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
}
-
-
}
@Test
public void testOnReceiveReadData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
final ActorRef subject = getSystem().actorOf(props, "testReadData");
@Test
public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props( store.newReadOnlyTransaction(), shard, testSchemaContext);
final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
@Test
public void testOnReceiveDataExistsPositive() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
@Test
public void testOnReceiveDataExistsNegative() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+ Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
@Test
public void testOnReceiveWriteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newWriteOnlyTransaction(), shard, TestModel.createTestContext());
final ActorRef subject =
@Test
public void testOnReceiveMergeData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext);
final ActorRef subject =
@Test
public void testOnReceiveDeleteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props( store.newWriteOnlyTransaction(), shard, TestModel.createTestContext());
final ActorRef subject =
@Test
public void testOnReceiveReadyTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props( store.newReadWriteTransaction(), shard, TestModel.createTestContext());
final ActorRef subject =
@Test
public void testOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext());
final ActorRef subject =
public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
try {
- final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
final Props props =
ShardTransaction.props(store.newReadOnlyTransaction(), shard, TestModel.createTestContext());
final TestActorRef subject = TestActorRef.apply(props,getSystem());
--- /dev/null
+/*
+ *
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+
+public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
+
+ private static ListeningExecutorService storeExecutor =
+ MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
+
+ private static final InMemoryDOMDataStore store =
+ new InMemoryDOMDataStore("OPER", storeExecutor,
+ MoreExecutors.sameThreadExecutor());
+
+ private static final SchemaContext testSchemaContext =
+ TestModel.createTestContext();
+
+ private static final ShardIdentifier SHARD_IDENTIFIER =
+ ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+ static {
+ store.onGlobalContextUpdated(testSchemaContext);
+ }
+
+ private FiniteDuration ASK_RESULT_DURATION = Duration.create(3000, TimeUnit.MILLISECONDS);
+
+
+ @Test(expected = TestException.class)
+ public void testNegativeAbortResultsInException() throws Exception {
+
+ final ActorRef shard =
+ getSystem()
+ .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+ final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
+ .mock(DOMStoreThreePhaseCommitCohort.class);
+ final CompositeModification mockComposite =
+ Mockito.mock(CompositeModification.class);
+ final Props props =
+ ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
+
+ final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
+ .create(getSystem(), props,
+ "testNegativeAbortResultsInException");
+
+ when(mockCohort.abort()).thenReturn(
+ Futures.<Void>immediateFailedFuture(new TestException()));
+
+ Future<Object> future =
+ akka.pattern.Patterns.ask(subject,
+ ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder()
+ .build(), 3000);
+ assertTrue(future.isCompleted());
+
+ Await.result(future, ASK_RESULT_DURATION);
+
+
+
+ }
+
+
+ @Test(expected = OptimisticLockFailedException.class)
+ public void testNegativeCanCommitResultsInException() throws Exception {
+
+ final ActorRef shard =
+ getSystem()
+ .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+ final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
+ .mock(DOMStoreThreePhaseCommitCohort.class);
+ final CompositeModification mockComposite =
+ Mockito.mock(CompositeModification.class);
+ final Props props =
+ ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
+
+ final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
+ .create(getSystem(), props,
+ "testNegativeCanCommitResultsInException");
+
+ when(mockCohort.canCommit()).thenReturn(
+ Futures
+ .<Boolean>immediateFailedFuture(
+ new OptimisticLockFailedException("some exception")));
+
+ Future<Object> future =
+ akka.pattern.Patterns.ask(subject,
+ ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder()
+ .build(), 3000);
+
+
+ Await.result(future, ASK_RESULT_DURATION);
+
+ }
+
+
+ @Test(expected = TestException.class)
+ public void testNegativePreCommitResultsInException() throws Exception {
+
+ final ActorRef shard =
+ getSystem()
+ .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+ final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
+ .mock(DOMStoreThreePhaseCommitCohort.class);
+ final CompositeModification mockComposite =
+ Mockito.mock(CompositeModification.class);
+ final Props props =
+ ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
+
+ final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
+ .create(getSystem(), props,
+ "testNegativePreCommitResultsInException");
+
+ when(mockCohort.preCommit()).thenReturn(
+ Futures
+ .<Void>immediateFailedFuture(
+ new TestException()));
+
+ Future<Object> future =
+ akka.pattern.Patterns.ask(subject,
+ ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder()
+ .build(), 3000);
+
+ Await.result(future, ASK_RESULT_DURATION);
+
+ }
+
+ @Test(expected = TestException.class)
+ public void testNegativeCommitResultsInException() throws Exception {
+
+ final TestActorRef<Shard> subject = TestActorRef
+ .create(getSystem(),
+ Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null),
+ "testNegativeCommitResultsInException");
+
+ final ActorRef shardTransaction =
+ getSystem().actorOf(
+ ShardTransaction.props(store.newReadWriteTransaction(), subject,
+ TestModel.createTestContext()));
+
+ ShardTransactionMessages.WriteData writeData =
+ ShardTransactionMessages.WriteData.newBuilder()
+ .setInstanceIdentifierPathArguments(
+ NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+ .build()).setNormalizedNode(
+ NormalizedNodeMessages.Node.newBuilder().build()
+
+ ).build();
+
+ //This is done so that Modification list is updated which is used during commit
+ Future future =
+ akka.pattern.Patterns.ask(shardTransaction, writeData, 3000);
+
+ //ready transaction creates the cohort so that we get into the
+ //block where in commmit is done
+ ShardTransactionMessages.ReadyTransaction readyTransaction =
+ ShardTransactionMessages.ReadyTransaction.newBuilder().build();
+
+ future =
+ akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000);
+
+ //but when the message is sent it will have the MockCommit object
+ //so that we can simulate throwing of exception
+ ForwardedCommitTransaction mockForwardCommitTransaction =
+ Mockito.mock(ForwardedCommitTransaction.class);
+ DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction =
+ Mockito.mock(DOMStoreThreePhaseCommitCohort.class);
+ when(mockForwardCommitTransaction.getCohort())
+ .thenReturn(mockThreePhaseCommitTransaction);
+ when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures
+ .<Void>immediateFailedFuture(
+ new TestException()));
+ Modification mockModification = Mockito.mock(
+ Modification.class);
+ when(mockForwardCommitTransaction.getModification())
+ .thenReturn(mockModification);
+
+ when(mockModification.toSerializable()).thenReturn(
+ PersistentMessages.CompositeModification.newBuilder().build());
+
+ future =
+ akka.pattern.Patterns.ask(subject,
+ mockForwardCommitTransaction
+ , 3000);
+ Await.result(future, ASK_RESULT_DURATION);
+
+
+ }
+
+ private class TestException extends Exception {
+ }
+
+
+}
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorRef;
+import akka.actor.ActorPath;
+import akka.actor.ActorSelection;
import akka.actor.Props;
+import akka.dispatch.Futures;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import junit.framework.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.stubbing.Stubber;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+import scala.concurrent.duration.FiniteDuration;
-import java.util.Arrays;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertNotNull;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
- private ThreePhaseCommitCohortProxy proxy;
- private Props props;
- private ActorRef actorRef;
- private MockActorContext actorContext;
- private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
- Executors.newSingleThreadExecutor());
+ @Mock
+ private ActorContext actorContext;
@Before
- public void setUp(){
- props = Props.create(MessageCollectorActor.class);
- actorRef = getSystem().actorOf(props);
- actorContext = new MockActorContext(this.getSystem());
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
- proxy =
- new ThreePhaseCommitCohortProxy(actorContext,
- Arrays.asList(actorRef.path()), "txn-1", executor);
+ doReturn(getSystem()).when(actorContext).getActorSystem();
+ }
+ private ThreePhaseCommitCohortProxy setupProxy(int nCohorts) {
+ List<ActorPath> cohorts = Lists.newArrayList();
+ for(int i = 1; i <= nCohorts; i++) {
+ ActorPath path = getSystem().actorOf(Props.create(MessageCollectorActor.class)).path();
+ cohorts.add(path);
+ doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+ }
+
+ return new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
}
- @After
- public void tearDown() {
- executor.shutdownNow();
+ private void setupMockActorContext(Class<?> requestType, Object... responses) {
+ Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
+ .failed((Throwable) responses[0]) : Futures
+ .successful(((SerializableMessage) responses[0]).toSerializable()));
+
+ for(int i = 1; i < responses.length; i++) {
+ stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
+ .failed((Throwable) responses[i]) : Futures
+ .successful(((SerializableMessage) responses[i]).toSerializable()));
+ }
+
+ stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
+ isA(requestType), any(FiniteDuration.class));
+ }
+
+ private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
+ verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
+ any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
+ }
+
+ @Test
+ public void testCanCommitWithOneCohort() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ new CanCommitTransactionReply(true));
+
+ ListenableFuture<Boolean> future = proxy.canCommit();
+
+ assertEquals("canCommit", true, future.get());
+
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ new CanCommitTransactionReply(false));
+
+ future = proxy.canCommit();
+
+ assertEquals("canCommit", false, future.get());
+
+ verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
}
@Test
- public void testCanCommit() throws Exception {
- actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true).toSerializable());
+ public void testCanCommitWithMultipleCohorts() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
ListenableFuture<Boolean> future = proxy.canCommit();
- Assert.assertTrue(future.get().booleanValue());
+ assertEquals("canCommit", true, future.get());
+ verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+ }
+
+ @Test
+ public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(3);
+
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
+ new CanCommitTransactionReply(true));
+
+ ListenableFuture<Boolean> future = proxy.canCommit();
+
+ assertEquals("canCommit", false, future.get());
+
+ verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testCanCommitWithExceptionFailure() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+
+ proxy.canCommit().get();
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testCanCommitWithInvalidResponseType() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ new PreCommitTransactionReply());
+
+ proxy.canCommit().get();
}
@Test
public void testPreCommit() throws Exception {
- actorContext.setExecuteRemoteOperationResponse(new PreCommitTransactionReply().toSerializable());
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- ListenableFuture<Void> future = proxy.preCommit();
+ setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+ new PreCommitTransactionReply());
- future.get();
+ proxy.preCommit().get();
+ verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testPreCommitWithFailure() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+ setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+ new PreCommitTransactionReply(), new RuntimeException("mock"));
+
+ proxy.preCommit().get();
}
@Test
public void testAbort() throws Exception {
- actorContext.setExecuteRemoteOperationResponse(new AbortTransactionReply().toSerializable());
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- ListenableFuture<Void> future = proxy.abort();
+ setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
- future.get();
+ proxy.abort().get();
+ verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
+ }
+
+ @Test
+ public void testAbortWithFailure() throws Exception {
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+ setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+
+ // The exception should not get propagated.
+ proxy.abort().get();
+
+ verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
}
@Test
public void testCommit() throws Exception {
- actorContext.setExecuteRemoteOperationResponse(new CommitTransactionReply().toSerializable());
- ListenableFuture<Void> future = proxy.commit();
+ ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+ setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
+ new CommitTransactionReply());
+
+ proxy.commit().get();
+
+ verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testCommitWithFailure() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(2);
- future.get();
+ setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
+ new RuntimeException("mock"));
+
+ proxy.commit().get();
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void teseCommitWithInvalidResponseType() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(1);
+
+ setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
+
+ proxy.commit().get();
}
@Test
- public void testGetCohortPaths() throws Exception {
- assertNotNull(proxy.getCohortPaths());
+ public void testGetCohortPaths() {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(2);
+
+ List<ActorPath> paths = proxy.getCohortPaths();
+ assertNotNull("getCohortPaths returned null", paths);
+ assertEquals("getCohortPaths size", 2, paths.size());
}
}
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import akka.actor.ActorPath;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.Props;
+import akka.dispatch.Futures;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import junit.framework.Assert;
-import org.junit.After;
+
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-import java.util.List;
-import java.util.concurrent.Executors;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
-import static junit.framework.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.isA;
+
+@SuppressWarnings("resource")
public class TransactionProxyTest extends AbstractActorTest {
+ @SuppressWarnings("serial")
+ static class TestException extends RuntimeException {
+ }
+
+ static interface Invoker {
+ void invoke(TransactionProxy proxy) throws Exception;
+ }
+
private final Configuration configuration = new MockConfiguration();
- private final ActorContext testContext =
- new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration );
+ @Mock
+ private ActorContext mockActorContext;
- private final ListeningExecutorService transactionExecutor =
- MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+ private SchemaContext schemaContext;
+
+ String memberName = "mock-member";
@Before
public void setUp(){
- ShardStrategyFactory.setConfiguration(configuration);
- }
+ MockitoAnnotations.initMocks(this);
- @After
- public void tearDown() {
- transactionExecutor.shutdownNow();
- }
+ schemaContext = TestModel.createTestContext();
- @Test
- public void testRead() throws Exception {
- final Props props = Props.create(DoNothingActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ doReturn(getSystem()).when(mockActorContext).getActorSystem();
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
- actorContext.setExecuteRemoteOperationResponse("message");
+ ShardStrategyFactory.setConfiguration(configuration);
+ }
+ private CreateTransaction eqCreateTransaction(final String memberName,
+ final TransactionType type) {
+ ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
+ @Override
+ public boolean matches(Object argument) {
+ CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+ return obj.getTransactionId().startsWith(memberName) &&
+ obj.getTransactionType() == type.ordinal();
+ }
+ };
+
+ return argThat(matcher);
+ }
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ private DataExists eqDataExists() {
+ ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
+ @Override
+ public boolean matches(Object argument) {
+ DataExists obj = DataExists.fromSerializable(argument);
+ return obj.getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+ return argThat(matcher);
+ }
- actorContext.setExecuteRemoteOperationResponse(
- new ReadDataReply(TestModel.createTestContext(), null)
- .toSerializable());
+ private ReadData eqReadData() {
+ ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
+ @Override
+ public boolean matches(Object argument) {
+ ReadData obj = ReadData.fromSerializable(argument);
+ return obj.getPath().equals(TestModel.TEST_PATH);
+ }
+ };
- ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
- transactionProxy.read(TestModel.TEST_PATH);
+ return argThat(matcher);
+ }
- Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
+ private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+ ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
+ @Override
+ public boolean matches(Object argument) {
+ WriteData obj = WriteData.fromSerializable(argument, schemaContext);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
+ }
+ };
+
+ return argThat(matcher);
+ }
- Assert.assertFalse(normalizedNodeOptional.isPresent());
+ private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+ ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
+ @Override
+ public boolean matches(Object argument) {
+ MergeData obj = MergeData.fromSerializable(argument, schemaContext);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
+ }
+ };
+
+ return argThat(matcher);
+ }
- actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
- TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable());
+ private DeleteData eqDeleteData() {
+ ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
+ @Override
+ public boolean matches(Object argument) {
+ DeleteData obj = DeleteData.fromSerializable(argument);
+ return obj.getPath().equals(TestModel.TEST_PATH);
+ }
+ };
- read = transactionProxy.read(TestModel.TEST_PATH);
+ return argThat(matcher);
+ }
- normalizedNodeOptional = read.get();
+ private Object readyTxReply(ActorPath path) {
+ return new ReadyTransactionReply(path).toSerializable();
+ }
- Assert.assertTrue(normalizedNodeOptional.isPresent());
+ private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
+ return Futures.successful(new ReadDataReply(schemaContext, data)
+ .toSerializable());
}
- @Test
- public void testExists() throws Exception {
- final Props props = Props.create(DoNothingActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ private Future<Object> dataExistsReply(boolean exists) {
+ return Futures.successful(new DataExistsReply(exists).toSerializable());
+ }
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
- actorContext.setExecuteRemoteOperationResponse("message");
+ private ActorSelection actorSelection(ActorRef actorRef) {
+ return getSystem().actorSelection(actorRef.path());
+ }
+ private FiniteDuration anyDuration() {
+ return any(FiniteDuration.class);
+ }
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ private CreateTransactionReply createTransactionReply(ActorRef actorRef){
+ return CreateTransactionReply.newBuilder()
+ .setTransactionActorPath(actorRef.path().toString())
+ .setTransactionId("txn-1").build();
+ }
+ private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
+ ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
+ doReturn(getSystem().actorSelection(actorRef.path())).
+ when(mockActorContext).actorSelection(actorRef.path().toString());
+ doReturn(memberName).when(mockActorContext).getCurrentMemberName();
+ doReturn(createTransactionReply(actorRef)).when(mockActorContext).
+ executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
+ eqCreateTransaction(memberName, type), anyDuration());
+ doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
+ anyString(), eq(actorRef.path().toString()));
+ doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
+
+ return actorRef;
+ }
- actorContext.setExecuteRemoteOperationResponse(new DataExistsReply(false).toSerializable());
+ @Test
+ public void testRead() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
- CheckedFuture<Boolean, ReadFailedException> exists =
- transactionProxy.exists(TestModel.TEST_PATH);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
- Assert.assertFalse(exists.checkedGet());
+ doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
- actorContext.setExecuteRemoteOperationResponse(new DataExistsReply(true).toSerializable());
+ Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+ TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
- exists = transactionProxy.exists(TestModel.TEST_PATH);
+ assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
- Assert.assertTrue(exists.checkedGet());
+ NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- actorContext.setExecuteRemoteOperationResponse("bad message");
+ doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
- exists = transactionProxy.exists(TestModel.TEST_PATH);
+ readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
- try {
- exists.checkedGet();
- fail();
- } catch(ReadFailedException e){
- }
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+ assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
}
@Test(expected = ReadFailedException.class)
public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception {
- final Props props = Props.create(DoNothingActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
-
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
- actorContext.setExecuteRemoteOperationResponse("message");
-
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ setupActorContextWithInitialCreateTransaction(READ_ONLY);
+ doReturn(Futures.successful(new Object())).when(mockActorContext).
+ executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
- CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>
- read = transactionProxy.read(TestModel.TEST_PATH);
-
- read.checkedGet();
+ transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
}
- @Test
- public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
- final ActorContext actorContext = mock(ActorContext.class);
-
- when(actorContext.executeShardOperation(anyString(), any(), any(
- FiniteDuration.class))).thenThrow(new PrimaryNotFoundException("test"));
+ @Test(expected = TestException.class)
+ public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
+ setupActorContextWithInitialCreateTransaction(READ_ONLY);
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ doThrow(new TestException()).when(mockActorContext).
+ executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
- ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
- transactionProxy.read(TestModel.TEST_PATH);
-
- Assert.assertFalse(read.get().isPresent());
-
+ try {
+ transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ fail("Expected ReadFailedException");
+ } catch(ReadFailedException e) {
+ // Expected - throw cause - expects TestException.
+ throw e.getCause();
+ }
}
+ private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
+ throws Throwable {
- @Test
- public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
- final ActorContext actorContext = mock(ActorContext.class);
+ doThrow(exToThrow).when(mockActorContext).executeShardOperation(
+ anyString(), any(), anyDuration());
- when(actorContext.executeShardOperation(anyString(), any(), any(
- FiniteDuration.class))).thenThrow(new TimeoutException("test", new Exception("reason")));
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ try {
+ invoker.invoke(transactionProxy);
+ fail("Expected ReadFailedException");
+ } catch(ReadFailedException e) {
+ // Expected - throw cause - expects TestException.
+ throw e.getCause();
+ }
+ }
+ private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
+ testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
+ @Override
+ public void invoke(TransactionProxy proxy) throws Exception {
+ proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ }
+ });
+ }
- ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
- transactionProxy.read(TestModel.TEST_PATH);
+ @Test(expected = PrimaryNotFoundException.class)
+ public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
+ testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
+ }
- Assert.assertFalse(read.get().isPresent());
+ @Test(expected = TimeoutException.class)
+ public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
+ testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
+ new Exception("reason")));
+ }
+ @Test(expected = TestException.class)
+ public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
+ testReadWithExceptionOnInitialCreateTransaction(new TestException());
}
@Test
- public void testReadWhenAAnyOtherExceptionIsThrown() throws Exception {
- final ActorContext actorContext = mock(ActorContext.class);
-
- when(actorContext.executeShardOperation(anyString(), any(), any(
- FiniteDuration.class))).thenThrow(new NullPointerException());
-
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ public void testExists() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
- try {
- ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
- transactionProxy.read(TestModel.TEST_PATH);
- fail("A null pointer exception was expected");
- } catch(NullPointerException e){
+ doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
- }
- }
+ Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
+ assertEquals("Exists response", false, exists);
+ doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
- @Test
- public void testWrite() throws Exception {
- final Props props = Props.create(MessageCollectorActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
- actorContext.setExecuteRemoteOperationResponse("message");
+ assertEquals("Exists response", true, exists);
+ }
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ @Test(expected = PrimaryNotFoundException.class)
+ public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
+ testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
+ @Override
+ public void invoke(TransactionProxy proxy) throws Exception {
+ proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ }
+ });
+ }
- transactionProxy.write(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.NAME_QNAME));
+ @Test(expected = ReadFailedException.class)
+ public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception {
+ setupActorContextWithInitialCreateTransaction(READ_ONLY);
- Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
+ doReturn(Futures.successful(new Object())).when(mockActorContext).
+ executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
- Assert.assertNotNull(messages);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
- Assert.assertTrue(messages instanceof List);
+ transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ }
- List<Object> listMessages = (List<Object>) messages;
+ @Test(expected = TestException.class)
+ public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
+ setupActorContextWithInitialCreateTransaction(READ_ONLY);
- Assert.assertEquals(1, listMessages.size());
+ doThrow(new TestException()).when(mockActorContext).
+ executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
- Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
- }
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
- private Object createPrimaryFound(ActorRef actorRef) {
- return new PrimaryFound(actorRef.path().toString()).toSerializable();
+ try {
+ transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ fail("Expected ReadFailedException");
+ } catch(ReadFailedException e) {
+ // Expected - throw cause - expects TestException.
+ throw e.getCause();
+ }
}
@Test
- public void testMerge() throws Exception {
- final Props props = Props.create(MessageCollectorActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ public void testWrite() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
- actorContext.setExecuteRemoteOperationResponse("message");
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- transactionProxy.merge(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.NAME_QNAME));
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
+ verify(mockActorContext).sendRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ }
- Assert.assertNotNull(messages);
+ @Test
+ public void testMerge() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
- Assert.assertTrue(messages instanceof List);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
- List<Object> listMessages = (List<Object>) messages;
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- Assert.assertEquals(1, listMessages.size());
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
- Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
+ verify(mockActorContext).sendRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
}
@Test
public void testDelete() throws Exception {
- final Props props = Props.create(MessageCollectorActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
-
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
- actorContext.setExecuteRemoteOperationResponse("message");
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
transactionProxy.delete(TestModel.TEST_PATH);
- Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
-
- Assert.assertNotNull(messages);
-
- Assert.assertTrue(messages instanceof List);
-
- List<Object> listMessages = (List<Object>) messages;
-
- Assert.assertEquals(1, listMessages.size());
-
- Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
+ verify(mockActorContext).sendRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDeleteData());
}
+ @SuppressWarnings("unchecked")
@Test
public void testReady() throws Exception {
- final Props props = Props.create(DoNothingActor.class);
- final ActorRef doNothingActorRef = getSystem().actorOf(props);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
- actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable());
+ doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
transactionProxy.read(TestModel.TEST_PATH);
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
- Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
-
+ assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths());
}
@Test
- public void testGetIdentifier(){
- final Props props = Props.create(DoNothingActor.class);
- final ActorRef doNothingActorRef = getSystem().actorOf(props);
-
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
-
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
-
- Assert.assertNotNull(transactionProxy.getIdentifier());
+ public void testGetIdentifier() {
+ setupActorContextWithInitialCreateTransaction(READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ TransactionProxy.TransactionType.READ_ONLY, schemaContext);
+
+ Object id = transactionProxy.getIdentifier();
+ assertNotNull("getIdentifier returned null", id);
+ assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
}
+ @SuppressWarnings("unchecked")
@Test
- public void testClose(){
- final Props props = Props.create(MessageCollectorActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ public void testClose() throws Exception{
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
- final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
- actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
- actorContext.setExecuteRemoteOperationResponse("message");
+ doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
- TransactionProxy transactionProxy =
- new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
transactionProxy.read(TestModel.TEST_PATH);
transactionProxy.close();
- Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
-
- Assert.assertNotNull(messages);
-
- Assert.assertTrue(messages instanceof List);
-
- List<Object> listMessages = (List<Object>) messages;
-
- Assert.assertEquals(1, listMessages.size());
-
- Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS));
- }
-
- private CreateTransactionReply createTransactionReply(ActorRef actorRef){
- return CreateTransactionReply.newBuilder()
- .setTransactionActorPath(actorRef.path().toString())
- .setTransactionId("txn-1")
- .build();
+ verify(mockActorContext).sendRemoteOperationAsync(
+ eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
}
}
package org.opendaylight.controller.cluster.datastore.utils;
+import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
+
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
}
private static Props props(final boolean found, final ActorRef actorRef){
- return Props.create(new Creator<MockShardManager>() {
+ return Props.create(new MockShardManagerCreator(found, actorRef) );
+ }
- @Override public MockShardManager create()
- throws Exception {
- return new MockShardManager(found,
- actorRef);
- }
- });
+ @SuppressWarnings("serial")
+ private static class MockShardManagerCreator implements Creator<MockShardManager> {
+ final boolean found;
+ final ActorRef actorRef;
+
+ MockShardManagerCreator(boolean found, ActorRef actorRef) {
+ this.found = found;
+ this.actorRef = actorRef;
+ }
+
+ @Override
+ public MockShardManager create() throws Exception {
+ return new MockShardManager(found, actorRef);
+ }
}
}
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef shardManagerActorRef = getSystem()
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef shardManagerActorRef = getSystem()
}};
}
+
+ @Test
+ public void testExecuteRemoteOperation() {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("3 seconds")) {
+ @Override
+ protected void run() {
+
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(true, shardActorRef));
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+
+ Object out = actorContext.executeRemoteOperation(actor, "hello", duration("3 seconds"));
+
+ assertEquals("hello", out);
+
+ expectNoMsg();
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testExecuteRemoteOperationAsync() {
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("3 seconds")) {
+ @Override
+ protected void run() {
+
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(true, shardActorRef));
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+
+ Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello",
+ Duration.create(3, TimeUnit.SECONDS));
+
+ try {
+ Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
+ assertEquals("Result", "hello", result);
+ } catch(Exception e) {
+ throw new AssertionError(e);
+ }
+
+ expectNoMsg();
+ }
+ };
+ }};
+ }
}
package org.opendaylight.controller.cluster.datastore.utils;
-
+import static org.junit.Assert.assertNotNull;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
public class MockActorContext extends ActorContext {
- private Object executeShardOperationResponse;
- private Object executeRemoteOperationResponse;
- private Object executeLocalOperationResponse;
- private Object executeLocalShardOperationResponse;
+ private volatile Object executeShardOperationResponse;
+ private volatile Object executeRemoteOperationResponse;
+ private volatile Object executeLocalOperationResponse;
+ private volatile Object executeLocalShardOperationResponse;
+ private volatile Exception executeRemoteOperationFailure;
+ private volatile Object inputMessage;
public MockActorContext(ActorSystem actorSystem) {
super(actorSystem, null, new MockClusterWrapper(), new MockConfiguration());
executeRemoteOperationResponse = response;
}
+ public void setExecuteRemoteOperationFailure(Exception executeRemoteOperationFailure) {
+ this.executeRemoteOperationFailure = executeRemoteOperationFailure;
+ }
+
public void setExecuteLocalOperationResponse(
Object executeLocalOperationResponse) {
this.executeLocalOperationResponse = executeLocalOperationResponse;
this.executeLocalShardOperationResponse = executeLocalShardOperationResponse;
}
- @Override public Object executeLocalOperation(ActorRef actor,
+ @SuppressWarnings("unchecked")
+ public <T> T getInputMessage(Class<T> expType) throws Exception {
+ assertNotNull("Input message was null", inputMessage);
+ return (T) expType.getMethod("fromSerializable", Object.class).invoke(null, inputMessage);
+ }
+
+ @Override
+ public Object executeLocalOperation(ActorRef actor,
Object message, FiniteDuration duration) {
return this.executeLocalOperationResponse;
}
- @Override public Object executeLocalShardOperation(String shardName,
+ @Override
+ public Object executeLocalShardOperation(String shardName,
Object message, FiniteDuration duration) {
return this.executeLocalShardOperationResponse;
}
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
-import org.opendaylight.yangtools.util.PropertyUtils;
-
import com.google.common.collect.ImmutableMap;
/**
public final class DomInmemoryDataBrokerModule extends
org.opendaylight.controller.config.yang.md.sal.dom.impl.AbstractDomInmemoryDataBrokerModule {
- private static final String FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP =
- "mdsal.datastore-future-callback-queue.size";
- private static final int DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE = 1000;
-
- private static final String FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP =
- "mdsal.datastore-future-callback-pool.size";
- private static final int DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE = 20;
- private static final String COMMIT_EXECUTOR_MAX_QUEUE_SIZE_PROP =
- "mdsal.datastore-commit-queue.size";
- private static final int DEFAULT_COMMIT_EXECUTOR_MAX_QUEUE_SIZE = 5000;
-
public DomInmemoryDataBrokerModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
* system it's running on.
*/
ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor(
- PropertyUtils.getIntSystemProperty(
- COMMIT_EXECUTOR_MAX_QUEUE_SIZE_PROP,
- DEFAULT_COMMIT_EXECUTOR_MAX_QUEUE_SIZE), "WriteTxCommit");
+ getMaxDataBrokerCommitQueueSize(), "WriteTxCommit");
/*
* We use an executor for commit ListenableFuture callbacks that favors reusing available
* reached, subsequent submitted tasks will block the caller.
*/
Executor listenableFutureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(
- PropertyUtils.getIntSystemProperty(
- FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP,
- DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE),
- PropertyUtils.getIntSystemProperty(
- FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP,
- DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE), "CommitFutures");
+ getMaxDataBrokerFutureCallbackPoolSize(), getMaxDataBrokerFutureCallbackQueueSize(),
+ "CommitFutures");
DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores,
new DeadlockDetectingListeningExecutorService(commitExecutor,
}
}
}
+
+ leaf max-data-broker-future-callback-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for the data broker's commit future callback executor.";
+ }
+
+ leaf max-data-broker-future-callback-pool-size {
+ default 20;
+ type uint16;
+ description "The maximum thread pool size for the data broker's commit future callback executor.";
+ }
+
+ leaf max-data-broker-commit-queue-size {
+ default 5000;
+ type uint16;
+ description "The maximum queue size for the data broker's commit executor.";
+ }
}
}
package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryConfigDataStoreProviderModule {
@Override
public java.lang.AutoCloseable createInstance() {
- return InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency());
+ return InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency(),
+ InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(),
+ getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize()));
}
}
package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryOperationalDataStoreProviderModule {
@Override
public java.lang.AutoCloseable createInstance() {
- return InMemoryDOMDataStoreFactory.create("DOM-OPER", getOperationalSchemaServiceDependency());
+ return InMemoryDOMDataStoreFactory.create("DOM-OPER", getOperationalSchemaServiceDependency(),
+ InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(),
+ getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize()));
}
}
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.yangtools.util.ExecutorServiceUtil;
-import org.opendaylight.yangtools.util.PropertyUtils;
import org.opendaylight.yangtools.util.concurrent.NotificationManager;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
}
};
- private static final String DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP =
- "mdsal.datastore-dcl-notification-queue.size";
-
- private static final int DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE = 1000;
-
private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
private final ListenerTree listenerTree = ListenerTree.create();
private final AtomicLong txCounter = new AtomicLong(0);
public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
final ExecutorService dataChangeListenerExecutor) {
+ this(name, listeningExecutor, dataChangeListenerExecutor,
+ InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE);
+ }
+
+ public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
+ final ExecutorService dataChangeListenerExecutor, int maxDataChangeListenerQueueSize) {
this.name = Preconditions.checkNotNull(name);
this.listeningExecutor = Preconditions.checkNotNull(listeningExecutor);
this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
- int maxDCLQueueSize = PropertyUtils.getIntSystemProperty(
- DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE );
-
dataChangeListenerNotificationManager =
new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
- DCL_NOTIFICATION_MGR_INVOKER, maxDCLQueueSize, "DataChangeListenerQueueMgr");
+ DCL_NOTIFICATION_MGR_INVOKER, maxDataChangeListenerQueueSize,
+ "DataChangeListenerQueueMgr");
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+/**
+ * Holds configuration properties when creating an {@link InMemoryDOMDataStore} instance via the
+ * {@link InMemoryDOMDataStoreFactory}
+ *
+ * @author Thomas Pantelis
+ * @see InMemoryDOMDataStoreFactory
+ */
+public class InMemoryDOMDataStoreConfigProperties {
+
+ public static final int DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE = 1000;
+ public static final int DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE = 20;
+ public static final int DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE = 1000;
+
+ private static final InMemoryDOMDataStoreConfigProperties DEFAULT =
+ create(DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE,
+ DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE,
+ DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE);
+
+ private final int maxDataChangeExecutorQueueSize;
+ private final int maxDataChangeExecutorPoolSize;
+ private final int maxDataChangeListenerQueueSize;
+
+ /**
+ * Constructs an instance with the given property values.
+ *
+ * @param maxDataChangeExecutorPoolSize
+ * maximum thread pool size for the data change notification executor.
+ * @param maxDataChangeExecutorQueueSize
+ * maximum queue size for the data change notification executor.
+ * @param maxDataChangeListenerQueueSize
+ * maximum queue size for the data change listeners.
+ */
+ public static InMemoryDOMDataStoreConfigProperties create(int maxDataChangeExecutorPoolSize,
+ int maxDataChangeExecutorQueueSize, int maxDataChangeListenerQueueSize) {
+ return new InMemoryDOMDataStoreConfigProperties(maxDataChangeExecutorPoolSize,
+ maxDataChangeExecutorQueueSize, maxDataChangeListenerQueueSize);
+ }
+
+ /**
+ * Returns the InMemoryDOMDataStoreConfigProperties instance with default values.
+ */
+ public static InMemoryDOMDataStoreConfigProperties getDefault() {
+ return DEFAULT;
+ }
+
+ private InMemoryDOMDataStoreConfigProperties(int maxDataChangeExecutorPoolSize,
+ int maxDataChangeExecutorQueueSize, int maxDataChangeListenerQueueSize) {
+ this.maxDataChangeExecutorQueueSize = maxDataChangeExecutorQueueSize;
+ this.maxDataChangeExecutorPoolSize = maxDataChangeExecutorPoolSize;
+ this.maxDataChangeListenerQueueSize = maxDataChangeListenerQueueSize;
+ }
+
+ /**
+ * Returns the maximum queue size for the data change notification executor.
+ */
+ public int getMaxDataChangeExecutorQueueSize() {
+ return maxDataChangeExecutorQueueSize;
+ }
+
+ /**
+ * Returns the maximum thread pool size for the data change notification executor.
+ */
+ public int getMaxDataChangeExecutorPoolSize() {
+ return maxDataChangeExecutorPoolSize;
+ }
+
+ /**
+ * Returns the maximum queue size for the data change listeners.
+ */
+ public int getMaxDataChangeListenerQueueSize() {
+ return maxDataChangeListenerQueueSize;
+ }
+}
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
-import org.opendaylight.yangtools.util.PropertyUtils;
import com.google.common.util.concurrent.MoreExecutors;
/**
*/
public final class InMemoryDOMDataStoreFactory {
- private static final String DCL_EXECUTOR_MAX_QUEUE_SIZE_PROP =
- "mdsal.datastore-dcl-notification-queue.size";
- private static final int DEFAULT_DCL_EXECUTOR_MAX_QUEUE_SIZE = 1000;
-
- private static final String DCL_EXECUTOR_MAX_POOL_SIZE_PROP =
- "mdsal.datastore-dcl-notification-pool.size";
- private static final int DEFAULT_DCL_EXECUTOR_MAX_POOL_SIZE = 20;
-
private InMemoryDOMDataStoreFactory() {
}
+ public static InMemoryDOMDataStore create(final String name,
+ @Nullable final SchemaService schemaService) {
+ return create(name, schemaService, null);
+ }
+
/**
* Creates an InMemoryDOMDataStore instance.
*
* @param name the name of the data store
* @param schemaService the SchemaService to which to register the data store.
+ * @param properties configuration properties for the InMemoryDOMDataStore instance. If null,
+ * default property values are used.
* @return an InMemoryDOMDataStore instance
*/
public static InMemoryDOMDataStore create(final String name,
- @Nullable final SchemaService schemaService) {
+ @Nullable final SchemaService schemaService,
+ @Nullable final InMemoryDOMDataStoreConfigProperties properties) {
+
+ InMemoryDOMDataStoreConfigProperties actualProperties = properties;
+ if(actualProperties == null) {
+ actualProperties = InMemoryDOMDataStoreConfigProperties.getDefault();
+ }
// For DataChangeListener notifications we use an executor that provides the fastest
// task execution time to get higher throughput as DataChangeListeners typically provide
// much of the business logic for a data model. If the executor queue size limit is reached,
// subsequent submitted notifications will block the calling thread.
- int dclExecutorMaxQueueSize = PropertyUtils.getIntSystemProperty(
- DCL_EXECUTOR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_EXECUTOR_MAX_QUEUE_SIZE);
- int dclExecutorMaxPoolSize = PropertyUtils.getIntSystemProperty(
- DCL_EXECUTOR_MAX_POOL_SIZE_PROP, DEFAULT_DCL_EXECUTOR_MAX_POOL_SIZE);
+ int dclExecutorMaxQueueSize = actualProperties.getMaxDataChangeExecutorQueueSize();
+ int dclExecutorMaxPoolSize = actualProperties.getMaxDataChangeExecutorPoolSize();
ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" );
InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()),
- dataChangeListenerExecutor);
+ dataChangeListenerExecutor, actualProperties.getMaxDataChangeListenerQueueSize());
if(schemaService != null) {
schemaService.registerSchemaContextListener(dataStore);
when "/config:modules/config:module/config:type = 'inmemory-config-datastore-provider'";
container schema-service {
- uses config:service-ref {
+ uses config:service-ref {
refine type {
mandatory false;
config:required-identity sal:schema-service;
}
- }
+ }
+ }
+
+ leaf max-data-change-executor-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for the data change notification executor.";
+ }
+
+ leaf max-data-change-executor-pool-size {
+ default 20;
+ type uint16;
+ description "The maximum thread pool size for the data change notification executor.";
+ }
+
+ leaf max-data-change-listener-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for the data change listeners.";
}
}
}
+ // Augments the 'configuration' choice node under modules/module.
+ augment "/config:modules/config:module/config:configuration" {
+ case inmemory-operational-datastore-provider {
+ when "/config:modules/config:module/config:type = 'inmemory-operational-datastore-provider'";
+ // Yang does not allow two cases from same namespaces with same children
+ // Schema-service dependency renamed to operational-schema-service
+ // to prevent conflict with schema-service container from inmemory-config-datastore-provider
+ container operational-schema-service {
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity sal:schema-service;
+ }
+ }
+ }
+
+ leaf max-data-change-executor-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for the data change notification executor.";
+ }
- // Augments the 'configuration' choice node under modules/module.
- augment "/config:modules/config:module/config:configuration" {
- case inmemory-operational-datastore-provider {
- when "/config:modules/config:module/config:type = 'inmemory-operational-datastore-provider'";
+ leaf max-data-change-executor-pool-size {
+ default 20;
+ type uint16;
+ description "The maximum thread pool size for the data change notification executor.";
+ }
- // Yang does not allow two cases from same namespaces with same children
- // Schema-service dependency renamed to operational-schema-service
- // to prevent conflict with schema-service container from inmemory-config-datastore-provider
- container operational-schema-service {
- uses config:service-ref {
- refine type {
- mandatory false;
- config:required-identity sal:schema-service;
- }
- }
- }
+ leaf max-data-change-listener-queue-size {
+ default 1000;
+ type uint16;
+ description "The maximum queue size for the data change listeners.";
}
}
+ }
}
final CompositeNode schemasNode =
(CompositeNode) NetconfMessageTransformUtil.findNode(schemasNodeResult.getResult(), DATA_STATE_SCHEMAS_IDENTIFIER);
- return create(schemasNode);
+ return create(id, schemasNode);
}
/**
* Parse response of get(netconf-state/schemas) to find all schemas under netconf-state/schemas
*/
@VisibleForTesting
- protected static NetconfStateSchemas create(final CompositeNode schemasNode) {
+ protected static NetconfStateSchemas create(final RemoteDeviceId id, final CompositeNode schemasNode) {
final Set<RemoteYangSchema> availableYangSchemas = Sets.newHashSet();
for (final CompositeNode schemaNode : schemasNode.getCompositesByName(Schema.QNAME.withoutRevision())) {
- availableYangSchemas.add(RemoteYangSchema.createFromCompositeNode(schemaNode));
+ final Optional<RemoteYangSchema> fromCompositeNode = RemoteYangSchema.createFromCompositeNode(id, schemaNode);
+ if(fromCompositeNode.isPresent()) {
+ availableYangSchemas.add(fromCompositeNode.get());
+ }
}
return new NetconfStateSchemas(availableYangSchemas);
return qname;
}
- static RemoteYangSchema createFromCompositeNode(final CompositeNode schemaNode) {
+ static Optional<RemoteYangSchema> createFromCompositeNode(final RemoteDeviceId id, final CompositeNode schemaNode) {
Preconditions.checkArgument(schemaNode.getKey().equals(Schema.QNAME.withoutRevision()), "Wrong QName %s", schemaNode.getKey());
QName childNode = NetconfMessageTransformUtil.IETF_NETCONF_MONITORING_SCHEMA_FORMAT.withoutRevision();
final String formatAsString = getSingleChildNodeValue(schemaNode, childNode).get();
- Preconditions.checkArgument(formatAsString.equals(Yang.QNAME.getLocalName()),
- "Expecting format to be only %s, not %s", Yang.QNAME.getLocalName(), formatAsString);
+ if(formatAsString.equals(Yang.QNAME.getLocalName()) == false) {
+ logger.debug("{}: Ignoring schema due to unsupported format: {}", id, formatAsString);
+ return Optional.absent();
+ }
childNode = NetconfMessageTransformUtil.IETF_NETCONF_MONITORING_SCHEMA_LOCATION.withoutRevision();
final Set<String> locationsAsString = getAllChildNodeValues(schemaNode, childNode);
- Preconditions.checkArgument(locationsAsString.contains(Schema.Location.Enumeration.NETCONF.toString()),
- "Expecting location to be %s, not %s", Schema.Location.Enumeration.NETCONF.toString(), locationsAsString);
+ if(locationsAsString.contains(Schema.Location.Enumeration.NETCONF.toString()) == false) {
+ logger.debug("{}: Ignoring schema due to unsupported location: {}", id, locationsAsString);
+ return Optional.absent();
+ }
childNode = NetconfMessageTransformUtil.IETF_NETCONF_MONITORING_SCHEMA_NAMESPACE.withoutRevision();
final String namespaceAsString = getSingleChildNodeValue(schemaNode, childNode).get();
? QName.create(namespaceAsString, revisionAsString.get(), moduleNameAsString)
: QName.create(URI.create(namespaceAsString), null, moduleNameAsString).withoutRevision();
- return new RemoteYangSchema(moduleQName);
+ return Optional.of(new RemoteYangSchema(moduleQName));
}
private static Set<String> getAllChildNodeValues(final CompositeNode schemaNode, final QName childNodeQName) {
import java.util.Set;
import org.junit.Test;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;