From: Devin Avery Date: Wed, 20 Aug 2014 13:14:36 +0000 (+0000) Subject: Merge "BUG 932 - Swagger HTTP POST contains incorrect object" X-Git-Tag: release/helium~261 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=63b36aa3537d77bd9be323e1113716ef2cd54098;hp=6ab0aae9f5fcc5a464670c85e8249c575c4d9b9e Merge "BUG 932 - Swagger HTTP POST contains incorrect object" --- diff --git a/.gitignore b/.gitignore index 9144cda4cc..b304ffce87 100644 --- a/.gitignore +++ b/.gitignore @@ -28,4 +28,6 @@ maven-eclipse.xml .DS_STORE .metadata opendaylight/md-sal/sal-distributed-datastore/journal +!opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin + diff --git a/features/config-netty/pom.xml b/features/config-netty/pom.xml index 2f4b4b1e21..5fbc463df3 100644 --- a/features/config-netty/pom.xml +++ b/features/config-netty/pom.xml @@ -9,7 +9,7 @@ features-config-netty - pom + jar features.xml @@ -21,7 +21,6 @@ features-config-persister features xml - runtime org.opendaylight.controller @@ -47,6 +46,11 @@ org.opendaylight.controller config-netty-config + + + org.opendaylight.yangtools + features-test + @@ -92,6 +96,20 @@ + + org.apache.maven.plugins + maven-surefire-plugin + + + org.opendaylight.controller + opendaylight-karaf-empty + ${commons.opendaylight.version} + + + org.opendaylight.yangtools:features-test + + + diff --git a/features/config-persister/pom.xml b/features/config-persister/pom.xml index 6dc8941345..3346c754d6 100644 --- a/features/config-persister/pom.xml +++ b/features/config-persister/pom.xml @@ -9,7 +9,7 @@ features-config-persister - pom + jar features.xml @@ -22,21 +22,18 @@ ${yangtools.version} features xml - runtime org.opendaylight.controller features-netconf features xml - runtime org.opendaylight.controller features-config features xml - runtime org.opendaylight.controller @@ -82,6 +79,11 @@ org.eclipse.persistence org.eclipse.persistence.moxy + + + org.opendaylight.yangtools + features-test + @@ -127,6 +129,20 @@ + + org.apache.maven.plugins + maven-surefire-plugin + + + org.opendaylight.controller + opendaylight-karaf-empty + ${commons.opendaylight.version} + + + org.opendaylight.yangtools:features-test + + + diff --git a/features/config/pom.xml b/features/config/pom.xml index c69e11bed2..8c061c2736 100644 --- a/features/config/pom.xml +++ b/features/config/pom.xml @@ -9,7 +9,7 @@ features-config - pom + jar features.xml @@ -22,7 +22,6 @@ ${yangtools.version} features xml - runtime org.opendaylight.controller @@ -92,6 +91,11 @@ org.opendaylight.controller config-manager + + + org.opendaylight.yangtools + features-test + @@ -137,6 +141,20 @@ + + org.apache.maven.plugins + maven-surefire-plugin + + + org.opendaylight.controller + opendaylight-karaf-empty + ${commons.opendaylight.version} + + + org.opendaylight.yangtools:features-test + + + diff --git a/features/config/src/main/resources/features.xml b/features/config/src/main/resources/features.xml index 6c0d32427d..5027588acb 100644 --- a/features/config/src/main/resources/features.xml +++ b/features/config/src/main/resources/features.xml @@ -6,7 +6,7 @@ mvn:org.opendaylight.yangtools/features-yangtools/${yangtools.version}/xml/features - odl-mdsal-common + odl-mdsal-common odl-config-api odl-config-netty-config-api odl-config-core diff --git a/features/flow/pom.xml b/features/flow/pom.xml index 09bb6c91e6..ac189737d9 100644 --- a/features/flow/pom.xml +++ b/features/flow/pom.xml @@ -9,7 +9,7 @@ features-flow - pom + jar features.xml @@ -22,7 +22,6 @@ ${mdsal.version} features xml - runtime org.opendaylight.controller.model @@ -64,6 +63,11 @@ org.opendaylight.controller.md forwardingrules-manager + + + org.opendaylight.yangtools + features-test + @@ -109,6 +113,20 @@ + + org.apache.maven.plugins + maven-surefire-plugin + + + org.opendaylight.controller + opendaylight-karaf-empty + ${commons.opendaylight.version} + + + org.opendaylight.yangtools:features-test + + + diff --git a/features/mdsal/pom.xml b/features/mdsal/pom.xml index 4f1ba98e5c..e7b825c48b 100644 --- a/features/mdsal/pom.xml +++ b/features/mdsal/pom.xml @@ -9,7 +9,7 @@ features-mdsal - pom + jar features.xml @@ -21,28 +21,24 @@ features-yangtools features xml - runtime org.opendaylight.controller features-config features xml - runtime org.opendaylight.controller features-config-persister features xml - runtime org.opendaylight.controller features-config-netty features xml - runtime org.opendaylight.controller @@ -172,6 +168,12 @@ org.opendaylight.controller.samples toaster-config + + + org.opendaylight.yangtools + features-test + 0.6.2-SNAPSHOT + @@ -217,6 +219,20 @@ + + org.apache.maven.plugins + maven-surefire-plugin + + + org.opendaylight.controller + opendaylight-karaf-empty + ${commons.opendaylight.version} + + + org.opendaylight.yangtools:features-test + + + diff --git a/features/protocol-framework/pom.xml b/features/protocol-framework/pom.xml index 97836be455..dcd24d6216 100644 --- a/features/protocol-framework/pom.xml +++ b/features/protocol-framework/pom.xml @@ -9,7 +9,7 @@ features-protocol-framework ${protocol-framework.version} - pom + jar features.xml @@ -21,12 +21,16 @@ features-config features xml - runtime org.opendaylight.controller protocol-framework + + + org.opendaylight.yangtools + features-test + @@ -72,6 +76,20 @@ + + org.apache.maven.plugins + maven-surefire-plugin + + + org.opendaylight.controller + opendaylight-karaf-empty + ${commons.opendaylight.version} + + + org.opendaylight.yangtools:features-test + + + diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml index c74e7ae410..1064afd82d 100644 --- a/opendaylight/commons/opendaylight/pom.xml +++ b/opendaylight/commons/opendaylight/pom.xml @@ -1570,6 +1570,12 @@ toaster-config ${mdsal.version} + + org.opendaylight.yangtools + features-test + ${yangtools.version} + test + org.opendaylight.yangtools features-yangtools diff --git a/opendaylight/distribution/opendaylight-karaf-empty/pom.xml b/opendaylight/distribution/opendaylight-karaf-empty/pom.xml new file mode 100644 index 0000000000..d3dfe19c2b --- /dev/null +++ b/opendaylight/distribution/opendaylight-karaf-empty/pom.xml @@ -0,0 +1,235 @@ + + + 4.0.0 + + org.opendaylight.controller + commons.opendaylight + 1.4.2-SNAPSHOT + ../../commons/opendaylight + + opendaylight-karaf-empty + pom + + 3.0 + + + + + + org.apache.karaf.features + framework + ${karaf.version} + kar + + + + org.apache.karaf.features + standard + ${karaf.version} + features + xml + runtime + + + + + org.opendaylight.controller + karaf.branding + compile + + + + + org.opendaylight.controller + opendaylight-karaf-resources + ${project.version} + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.felix + maven-bundle-plugin + [0,) + + cleanVersions + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + [0,) + + copy + unpack + + + + + + + + + org.apache.karaf.tooling + karaf-maven-plugin + [0,) + + commands-generate-help + + + + + + + + + org.fusesource.scalate + maven-scalate-plugin + [0,) + + sitegen + + + + + + + + + org.apache.servicemix.tooling + depends-maven-plugin + [0,) + + generate-depends-file + + + + + + + + + + + + + + + org.apache.karaf.tooling + karaf-maven-plugin + ${karaf.version} + true + + + process-resources + + install-kars + + process-resources + + + package + + instance-create-archive + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + ${checkstyle.version} + + **\/target\/,**\/bin\/,**\/target-ide\/,**\/configuration\/initial\/ + + + + org.apache.maven.plugins + maven-dependency-plugin + 2.6 + + + copy + + copy + + + generate-resources + + + + org.opendaylight.controller + karaf.branding + ${karaf.branding.version} + target/assembly/lib + karaf.branding-${branding.version}.jar + + + + + + unpack-karaf-resources + + unpack-dependencies + + prepare-package + + ${project.build.directory}/assembly + org.opendaylight.controller + opendaylight-karaf-resources + META-INF\/** + true + false + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + prepare-package + + run + + + + + + + + + + + + + + + + + + scm:git:ssh://git.opendaylight.org:29418/controller.git + scm:git:ssh://git.opendaylight.org:29418/controller.git + HEAD + https://wiki.opendaylight.org/view/OpenDaylight_Controller:Main + + diff --git a/opendaylight/distribution/opendaylight-karaf-resources/pom.xml b/opendaylight/distribution/opendaylight-karaf-resources/pom.xml new file mode 100644 index 0000000000..00495a3201 --- /dev/null +++ b/opendaylight/distribution/opendaylight-karaf-resources/pom.xml @@ -0,0 +1,21 @@ + + + + + 4.0.0 + + org.opendaylight.controller + commons.opendaylight + 1.4.2-SNAPSHOT + ../../commons/opendaylight + + opendaylight-karaf-resources + Resources for opendaylight-karaf + jar + diff --git a/opendaylight/distribution/opendaylight-karaf/src/main/resources/karaf/instance b/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/instance similarity index 100% rename from opendaylight/distribution/opendaylight-karaf/src/main/resources/karaf/instance rename to opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/instance diff --git a/opendaylight/distribution/opendaylight-karaf/src/main/resources/karaf/instance.bat b/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/instance.bat similarity index 100% rename from opendaylight/distribution/opendaylight-karaf/src/main/resources/karaf/instance.bat rename to opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/instance.bat diff --git a/opendaylight/distribution/opendaylight-karaf/src/main/resources/karaf/karaf b/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/karaf similarity index 100% rename from opendaylight/distribution/opendaylight-karaf/src/main/resources/karaf/karaf rename to opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/karaf diff --git a/opendaylight/distribution/opendaylight-karaf/src/main/resources/karaf/karaf.bat b/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/karaf.bat similarity index 100% rename from opendaylight/distribution/opendaylight-karaf/src/main/resources/karaf/karaf.bat rename to opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/karaf.bat diff --git a/opendaylight/distribution/opendaylight-karaf/src/main/resources/configuration/context.xml b/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/context.xml similarity index 100% rename from opendaylight/distribution/opendaylight-karaf/src/main/resources/configuration/context.xml rename to opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/context.xml diff --git a/opendaylight/distribution/opendaylight-karaf/src/main/resources/configuration/logback.xml b/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/logback.xml similarity index 100% rename from opendaylight/distribution/opendaylight-karaf/src/main/resources/configuration/logback.xml rename to opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/logback.xml diff --git a/opendaylight/distribution/opendaylight-karaf/src/main/resources/configuration/tomcat-logging.properties b/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-logging.properties similarity index 100% rename from opendaylight/distribution/opendaylight-karaf/src/main/resources/configuration/tomcat-logging.properties rename to opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-logging.properties diff --git a/opendaylight/distribution/opendaylight-karaf/src/main/resources/configuration/tomcat-server.xml b/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-server.xml similarity index 100% rename from opendaylight/distribution/opendaylight-karaf/src/main/resources/configuration/tomcat-server.xml rename to opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-server.xml diff --git a/opendaylight/distribution/opendaylight-karaf/src/main/resources/etc/custom.properties b/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties similarity index 100% rename from opendaylight/distribution/opendaylight-karaf/src/main/resources/etc/custom.properties rename to opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties diff --git a/opendaylight/distribution/opendaylight-karaf/src/main/resources/version.properties b/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/version.properties similarity index 100% rename from opendaylight/distribution/opendaylight-karaf/src/main/resources/version.properties rename to opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/version.properties diff --git a/opendaylight/distribution/opendaylight-karaf/pom.xml b/opendaylight/distribution/opendaylight-karaf/pom.xml index b3c3f20ba8..5cbe412b2b 100644 --- a/opendaylight/distribution/opendaylight-karaf/pom.xml +++ b/opendaylight/distribution/opendaylight-karaf/pom.xml @@ -22,76 +22,48 @@ ${karaf.version} kar - - org.opendaylight.controller - base-features - ${project.version} - kar - org.opendaylight.controller karaf.branding compile - - - org.apache.karaf.features - standard - ${karaf.version} - features - xml - runtime - - - org.opendaylight.controller - base-features - ${project.parent.version} - features - xml - runtime - - + + org.opendaylight.controller - extras-features + opendaylight-karaf-resources ${project.version} - kar - runtime - - - - org.opendaylight.controller - features-adsal - features - xml - runtime + + - org.opendaylight.controller - features-nsf - ${project.version} + org.apache.karaf.features + standard + ${karaf.version} features xml - runtime + + org.opendaylight.controller features-mdsal features xml - runtime org.opendaylight.controller features-flow features xml - runtime @@ -238,27 +210,45 @@ + + unpack-karaf-resources + + unpack-dependencies + + prepare-package + + ${project.build.directory}/assembly + org.opendaylight.controller + opendaylight-karaf-resources + META-INF\/** + true + false + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + prepare-package + + run + + + + + + + + + + + + - - org.apache.maven.plugins - maven-antrun-plugin - - - prepare-package - - run - - - - - - - - - - - diff --git a/opendaylight/distribution/opendaylight-karaf/src/main/resources/etc/jre.properties b/opendaylight/distribution/opendaylight-karaf/src/main/resources/etc/jre.properties deleted file mode 100644 index a98956e98d..0000000000 --- a/opendaylight/distribution/opendaylight-karaf/src/main/resources/etc/jre.properties +++ /dev/null @@ -1,506 +0,0 @@ -################################################################################ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -################################################################################ - -# -# Java platform package export properties. -# - -# Standard package set. Note that: -# - javax.transaction* is exported with a mandatory attribute -jre-1.6= \ - javax.accessibility, \ - javax.activation;version="1.1", \ - javax.activity, \ - javax.crypto, \ - javax.crypto.interfaces, \ - javax.crypto.spec, \ - javax.imageio, \ - javax.imageio.event, \ - javax.imageio.metadata, \ - javax.imageio.plugins.bmp, \ - javax.imageio.plugins.jpeg, \ - javax.imageio.spi, \ - javax.imageio.stream, \ - javax.jws, \ - javax.jws.soap, \ - javax.lang.model, \ - javax.lang.model.element, \ - javax.lang.model.type, \ - javax.lang.model.util, \ - javax.management, \ - javax.management.loading, \ - javax.management.modelmbean, \ - javax.management.monitor, \ - javax.management.openmbean, \ - javax.management.relation, \ - javax.management.remote, \ - javax.management.remote.rmi, \ - javax.management.timer, \ - javax.naming, \ - javax.naming.directory, \ - javax.naming.event, \ - javax.naming.ldap, \ - javax.naming.spi, \ - javax.net, \ - javax.net.ssl, \ - javax.print, \ - javax.print.attribute, \ - javax.print.attribute.standard, \ - javax.print.event, \ - javax.rmi, \ - javax.rmi.CORBA, \ - javax.rmi.ssl, \ - javax.script, \ - javax.security.auth, \ - javax.security.auth.callback, \ - javax.security.auth.kerberos, \ - javax.security.auth.login, \ - javax.security.auth.spi, \ - javax.security.auth.x500, \ - javax.security.cert, \ - javax.security.sasl, \ - javax.sound.midi, \ - javax.sound.midi.spi, \ - javax.sound.sampled, \ - javax.sound.sampled.spi, \ - javax.sql, \ - javax.sql.rowset, \ - javax.sql.rowset.serial, \ - javax.sql.rowset.spi, \ - javax.swing, \ - javax.swing.border, \ - javax.swing.colorchooser, \ - javax.swing.event, \ - javax.swing.filechooser, \ - javax.swing.plaf, \ - javax.swing.plaf.basic, \ - javax.swing.plaf.metal, \ - javax.swing.plaf.multi, \ - javax.swing.plaf.synth, \ - javax.swing.table, \ - javax.swing.text, \ - javax.swing.text.html, \ - javax.swing.text.html.parser, \ - javax.swing.text.rtf, \ - javax.swing.tree, \ - javax.swing.undo, \ - javax.tools, \ - javax.transaction; javax.transaction.xa; partial=true; mandatory:=partial, \ - javax.xml, \ - javax.xml.bind;version="2.2.1", \ - javax.xml.bind.annotation;version="2.2.1", \ - javax.xml.bind.annotation.adapters;version="2.2.1", \ - javax.xml.bind.attachment;version="2.2.1", \ - javax.xml.bind.helpers;version="2.2.1", \ - javax.xml.bind.util;version="2.2.1", \ - javax.xml.crypto, \ - javax.xml.crypto.dom, \ - javax.xml.crypto.dsig, \ - javax.xml.crypto.dsig.dom, \ - javax.xml.crypto.dsig.keyinfo, \ - javax.xml.crypto.dsig.spec, \ - javax.xml.datatype, \ - javax.xml.namespace, \ - javax.xml.parsers, \ - javax.xml.soap;version="1.3", \ - javax.xml.stream;version="1.2", \ - javax.xml.stream.events;version="1.2", \ - javax.xml.stream.util;version="1.2", \ - javax.xml.transform, \ - javax.xml.transform.dom, \ - javax.xml.transform.sax, \ - javax.xml.transform.stax, \ - javax.xml.transform.stream, \ - javax.xml.validation, \ - javax.xml.ws;version="2.2", \ - javax.xml.ws.handler;version="2.2", \ - javax.xml.ws.handler.soap;version="2.2", \ - javax.xml.ws.http;version="2.2", \ - javax.xml.ws.soap;version="2.2", \ - javax.xml.ws.spi;version="2.2", \ - javax.xml.ws.wsaddressing;version="2.2", \ - javax.xml.ws.spi.http;version="2.2", \ - javax.xml.xpath, \ - org.ietf.jgss, \ - org.omg.CORBA, \ - org.omg.CORBA_2_3, \ - org.omg.CORBA_2_3.portable, \ - org.omg.CORBA.DynAnyPackage, \ - org.omg.CORBA.ORBPackage, \ - org.omg.CORBA.portable, \ - org.omg.CORBA.TypeCodePackage, \ - org.omg.CosNaming, \ - org.omg.CosNaming.NamingContextExtPackage, \ - org.omg.CosNaming.NamingContextPackage, \ - org.omg.Dynamic, \ - org.omg.DynamicAny, \ - org.omg.DynamicAny.DynAnyFactoryPackage, \ - org.omg.DynamicAny.DynAnyPackage, \ - org.omg.IOP, \ - org.omg.IOP.CodecFactoryPackage, \ - org.omg.IOP.CodecPackage, \ - org.omg.Messaging, \ - org.omg.PortableInterceptor, \ - org.omg.PortableInterceptor.ORBInitInfoPackage, \ - org.omg.PortableServer, \ - org.omg.PortableServer.CurrentPackage, \ - org.omg.PortableServer.POAManagerPackage, \ - org.omg.PortableServer.POAPackage, \ - org.omg.PortableServer.portable, \ - org.omg.PortableServer.ServantLocatorPackage, \ - org.omg.SendingContext, \ - org.omg.stub.java.rmi, \ - org.omg.stub.javax.management.remote.rmi, \ - org.w3c.dom, \ - org.w3c.dom.bootstrap, \ - org.w3c.dom.css, \ - org.w3c.dom.events, \ - org.w3c.dom.html, \ - org.w3c.dom.ls, \ - org.w3c.dom.ranges, \ - org.w3c.dom.stylesheets, \ - org.w3c.dom.traversal, \ - org.w3c.dom.views, \ - org.w3c.dom.xpath, \ - org.xml.sax, \ - org.xml.sax.ext, \ - org.xml.sax.helpers, \ - javax.annotation.processing - -# Standard package set. Note that: -# - javax.transaction* is exported with a mandatory attribute -jre-1.7= \ - javax.accessibility, \ - javax.activation;version="1.1", \ - javax.activity, \ - javax.crypto, \ - javax.crypto.interfaces, \ - javax.crypto.spec, \ - javax.imageio, \ - javax.imageio.event, \ - javax.imageio.metadata, \ - javax.imageio.plugins.bmp, \ - javax.imageio.plugins.jpeg, \ - javax.imageio.spi, \ - javax.imageio.stream, \ - javax.jws, \ - javax.jws.soap, \ - javax.lang.model, \ - javax.lang.model.element, \ - javax.lang.model.type, \ - javax.lang.model.util, \ - javax.management, \ - javax.management.loading, \ - javax.management.modelmbean, \ - javax.management.monitor, \ - javax.management.openmbean, \ - javax.management.relation, \ - javax.management.remote, \ - javax.management.remote.rmi, \ - javax.management.timer, \ - javax.naming, \ - javax.naming.directory, \ - javax.naming.event, \ - javax.naming.ldap, \ - javax.naming.spi, \ - javax.net, \ - javax.net.ssl, \ - javax.print, \ - javax.print.attribute, \ - javax.print.attribute.standard, \ - javax.print.event, \ - javax.rmi, \ - javax.rmi.CORBA, \ - javax.rmi.ssl, \ - javax.script, \ - javax.security.auth, \ - javax.security.auth.callback, \ - javax.security.auth.kerberos, \ - javax.security.auth.login, \ - javax.security.auth.spi, \ - javax.security.auth.x500, \ - javax.security.cert, \ - javax.security.sasl, \ - javax.sound.midi, \ - javax.sound.midi.spi, \ - javax.sound.sampled, \ - javax.sound.sampled.spi, \ - javax.sql, \ - javax.sql.rowset, \ - javax.sql.rowset.serial, \ - javax.sql.rowset.spi, \ - javax.swing, \ - javax.swing.border, \ - javax.swing.colorchooser, \ - javax.swing.event, \ - javax.swing.filechooser, \ - javax.swing.plaf, \ - javax.swing.plaf.basic, \ - javax.swing.plaf.metal, \ - javax.swing.plaf.multi, \ - javax.swing.plaf.synth, \ - javax.swing.table, \ - javax.swing.text, \ - javax.swing.text.html, \ - javax.swing.text.html.parser, \ - javax.swing.text.rtf, \ - javax.swing.tree, \ - javax.swing.undo, \ - javax.tools, \ - javax.transaction; javax.transaction.xa; partial=true; mandatory:=partial, \ - javax.xml, \ - javax.xml.bind;version="2.2.1", \ - javax.xml.bind.annotation;version="2.2.1", \ - javax.xml.bind.annotation.adapters;version="2.2.1", \ - javax.xml.bind.attachment;version="2.2.1", \ - javax.xml.bind.helpers;version="2.2.1", \ - javax.xml.bind.util;version="2.2.1", \ - javax.xml.crypto, \ - javax.xml.crypto.dom, \ - javax.xml.crypto.dsig, \ - javax.xml.crypto.dsig.dom, \ - javax.xml.crypto.dsig.keyinfo, \ - javax.xml.crypto.dsig.spec, \ - javax.xml.datatype, \ - javax.xml.namespace, \ - javax.xml.parsers, \ - javax.xml.soap;version="1.3", \ - javax.xml.stream;version="1.2", \ - javax.xml.stream.events;version="1.2", \ - javax.xml.stream.util;version="1.2", \ - javax.xml.transform, \ - javax.xml.transform.dom, \ - javax.xml.transform.sax, \ - javax.xml.transform.stax, \ - javax.xml.transform.stream, \ - javax.xml.validation, \ - javax.xml.ws;version="2.2", \ - javax.xml.ws.handler;version="2.2", \ - javax.xml.ws.handler.soap;version="2.2", \ - javax.xml.ws.http;version="2.2", \ - javax.xml.ws.soap;version="2.2", \ - javax.xml.ws.spi;version="2.2", \ - javax.xml.ws.wsaddressing;version="2.2", \ - javax.xml.ws.spi.http;version="2.2", \ - javax.xml.xpath, \ - org.ietf.jgss, \ - org.omg.CORBA, \ - org.omg.CORBA_2_3, \ - org.omg.CORBA_2_3.portable, \ - org.omg.CORBA.DynAnyPackage, \ - org.omg.CORBA.ORBPackage, \ - org.omg.CORBA.portable, \ - org.omg.CORBA.TypeCodePackage, \ - org.omg.CosNaming, \ - org.omg.CosNaming.NamingContextExtPackage, \ - org.omg.CosNaming.NamingContextPackage, \ - org.omg.Dynamic, \ - org.omg.DynamicAny, \ - org.omg.DynamicAny.DynAnyFactoryPackage, \ - org.omg.DynamicAny.DynAnyPackage, \ - org.omg.IOP, \ - org.omg.IOP.CodecFactoryPackage, \ - org.omg.IOP.CodecPackage, \ - org.omg.Messaging, \ - org.omg.PortableInterceptor, \ - org.omg.PortableInterceptor.ORBInitInfoPackage, \ - org.omg.PortableServer, \ - org.omg.PortableServer.CurrentPackage, \ - org.omg.PortableServer.POAManagerPackage, \ - org.omg.PortableServer.POAPackage, \ - org.omg.PortableServer.portable, \ - org.omg.PortableServer.ServantLocatorPackage, \ - org.omg.SendingContext, \ - org.omg.stub.java.rmi, \ - org.omg.stub.javax.management.remote.rmi, \ - org.w3c.dom, \ - org.w3c.dom.bootstrap, \ - org.w3c.dom.css, \ - org.w3c.dom.events, \ - org.w3c.dom.html, \ - org.w3c.dom.ls, \ - org.w3c.dom.ranges, \ - org.w3c.dom.stylesheets, \ - org.w3c.dom.traversal, \ - org.w3c.dom.views, \ - org.w3c.dom.xpath, \ - org.xml.sax, \ - org.xml.sax.ext, \ - org.xml.sax.helpers, \ - javax.annotation.processing - -jre-1.8= \ - javax.accessibility, \ - javax.activation;version="1.1", \ - javax.activity, \ - javax.crypto, \ - javax.crypto.interfaces, \ - javax.crypto.spec, \ - javax.imageio, \ - javax.imageio.event, \ - javax.imageio.metadata, \ - javax.imageio.plugins.bmp, \ - javax.imageio.plugins.jpeg, \ - javax.imageio.spi, \ - javax.imageio.stream, \ - javax.jws, \ - javax.jws.soap, \ - javax.lang.model, \ - javax.lang.model.element, \ - javax.lang.model.type, \ - javax.lang.model.util, \ - javax.management, \ - javax.management.loading, \ - javax.management.modelmbean, \ - javax.management.monitor, \ - javax.management.openmbean, \ - javax.management.relation, \ - javax.management.remote, \ - javax.management.remote.rmi, \ - javax.management.timer, \ - javax.naming, \ - javax.naming.directory, \ - javax.naming.event, \ - javax.naming.ldap, \ - javax.naming.spi, \ - javax.net, \ - javax.net.ssl, \ - javax.print, \ - javax.print.attribute, \ - javax.print.attribute.standard, \ - javax.print.event, \ - javax.rmi, \ - javax.rmi.CORBA, \ - javax.rmi.ssl, \ - javax.script, \ - javax.security.auth, \ - javax.security.auth.callback, \ - javax.security.auth.kerberos, \ - javax.security.auth.login, \ - javax.security.auth.spi, \ - javax.security.auth.x500, \ - javax.security.cert, \ - javax.security.sasl, \ - javax.sound.midi, \ - javax.sound.midi.spi, \ - javax.sound.sampled, \ - javax.sound.sampled.spi, \ - javax.sql, \ - javax.sql.rowset, \ - javax.sql.rowset.serial, \ - javax.sql.rowset.spi, \ - javax.swing, \ - javax.swing.border, \ - javax.swing.colorchooser, \ - javax.swing.event, \ - javax.swing.filechooser, \ - javax.swing.plaf, \ - javax.swing.plaf.basic, \ - javax.swing.plaf.metal, \ - javax.swing.plaf.multi, \ - javax.swing.plaf.synth, \ - javax.swing.table, \ - javax.swing.text, \ - javax.swing.text.html, \ - javax.swing.text.html.parser, \ - javax.swing.text.rtf, \ - javax.swing.tree, \ - javax.swing.undo, \ - javax.tools, \ - javax.transaction; javax.transaction.xa; partial=true; mandatory:=partial, \ - javax.xml, \ - javax.xml.bind;version="2.2.1", \ - javax.xml.bind.annotation;version="2.2.1", \ - javax.xml.bind.annotation.adapters;version="2.2.1", \ - javax.xml.bind.attachment;version="2.2.1", \ - javax.xml.bind.helpers;version="2.2.1", \ - javax.xml.bind.util;version="2.2.1", \ - javax.xml.crypto, \ - javax.xml.crypto.dom, \ - javax.xml.crypto.dsig, \ - javax.xml.crypto.dsig.dom, \ - javax.xml.crypto.dsig.keyinfo, \ - javax.xml.crypto.dsig.spec, \ - javax.xml.datatype, \ - javax.xml.namespace, \ - javax.xml.parsers, \ - javax.xml.soap;version="1.3", \ - javax.xml.stream;version="1.2", \ - javax.xml.stream.events;version="1.2", \ - javax.xml.stream.util;version="1.2", \ - javax.xml.transform, \ - javax.xml.transform.dom, \ - javax.xml.transform.sax, \ - javax.xml.transform.stax, \ - javax.xml.transform.stream, \ - javax.xml.validation, \ - javax.xml.ws;version="2.2", \ - javax.xml.ws.handler;version="2.2", \ - javax.xml.ws.handler.soap;version="2.2", \ - javax.xml.ws.http;version="2.2", \ - javax.xml.ws.soap;version="2.2", \ - javax.xml.ws.spi;version="2.2", \ - javax.xml.ws.wsaddressing;version="2.2", \ - javax.xml.ws.spi.http;version="2.2", \ - javax.xml.xpath, \ - org.ietf.jgss, \ - org.omg.CORBA, \ - org.omg.CORBA_2_3, \ - org.omg.CORBA_2_3.portable, \ - org.omg.CORBA.DynAnyPackage, \ - org.omg.CORBA.ORBPackage, \ - org.omg.CORBA.portable, \ - org.omg.CORBA.TypeCodePackage, \ - org.omg.CosNaming, \ - org.omg.CosNaming.NamingContextExtPackage, \ - org.omg.CosNaming.NamingContextPackage, \ - org.omg.Dynamic, \ - org.omg.DynamicAny, \ - org.omg.DynamicAny.DynAnyFactoryPackage, \ - org.omg.DynamicAny.DynAnyPackage, \ - org.omg.IOP, \ - org.omg.IOP.CodecFactoryPackage, \ - org.omg.IOP.CodecPackage, \ - org.omg.Messaging, \ - org.omg.PortableInterceptor, \ - org.omg.PortableInterceptor.ORBInitInfoPackage, \ - org.omg.PortableServer, \ - org.omg.PortableServer.CurrentPackage, \ - org.omg.PortableServer.POAManagerPackage, \ - org.omg.PortableServer.POAPackage, \ - org.omg.PortableServer.portable, \ - org.omg.PortableServer.ServantLocatorPackage, \ - org.omg.SendingContext, \ - org.omg.stub.java.rmi, \ - org.omg.stub.javax.management.remote.rmi, \ - org.w3c.dom, \ - org.w3c.dom.bootstrap, \ - org.w3c.dom.css, \ - org.w3c.dom.events, \ - org.w3c.dom.html, \ - org.w3c.dom.ls, \ - org.w3c.dom.ranges, \ - org.w3c.dom.stylesheets, \ - org.w3c.dom.traversal, \ - org.w3c.dom.views, \ - org.w3c.dom.xpath, \ - org.xml.sax, \ - org.xml.sax.ext, \ - org.xml.sax.helpers, \ - javax.annotation.processing diff --git a/opendaylight/distribution/opendaylight-karaf/src/main/resources/etc/startup.properties b/opendaylight/distribution/opendaylight-karaf/src/main/resources/etc/startup.properties deleted file mode 100644 index ca8c83c380..0000000000 --- a/opendaylight/distribution/opendaylight-karaf/src/main/resources/etc/startup.properties +++ /dev/null @@ -1,53 +0,0 @@ -#Bundles to be started on startup, with startlevel - -# feature: framework version: 3.0.1 -mvn\:org.ops4j.base/ops4j-base-lang/1.4.0 = 5 -mvn\:biz.aQute.bnd/bndlib/2.2.0 = 5 -mvn\:org.ops4j.pax.swissbox/pax-swissbox-bnd/1.7.0 = 5 -mvn\:org.ops4j.pax.url/pax-url-maven-commons/1.6.0 = 5 -mvn\:org.ops4j.pax.url/pax-url-aether/1.6.0 = 5 -mvn\:org.ops4j.pax.url/pax-url-wrap/1.6.0 = 5 -mvn\:javax.annotation/javax.annotation-api/1.2 = 5 -mvn\:org.ops4j.pax.logging/pax-logging-api/1.7.2 = 8 -mvn\:org.ops4j.pax.logging/pax-logging-service/1.7.2 = 8 -mvn\:org.apache.karaf.service/org.apache.karaf.service.guard/3.0.1 = 10 -mvn\:org.apache.felix/org.apache.felix.configadmin/1.6.0 = 10 -mvn\:org.apache.felix/org.apache.felix.fileinstall/3.2.8 = 11 -mvn\:org.ow2.asm/asm-all/4.1 = 12 -mvn\:org.apache.aries/org.apache.aries.util/1.1.0 = 20 -mvn\:org.apache.aries.proxy/org.apache.aries.proxy.api/1.0.0 = 20 -mvn\:org.apache.aries.proxy/org.apache.aries.proxy.impl/1.0.2 = 20 -mvn\:org.apache.aries.blueprint/org.apache.aries.blueprint.api/1.0.0 = 20 -mvn\:org.apache.aries.blueprint/org.apache.aries.blueprint.cm/1.0.3 = 20 -mvn\:org.apache.aries.blueprint/org.apache.aries.blueprint.core.compatibility/1.0.0 = 20 -mvn\:org.apache.aries.blueprint/org.apache.aries.blueprint.core/1.4.0 = 20 -mvn\:org.apache.karaf.deployer/org.apache.karaf.deployer.spring/3.0.1 = 24 -mvn\:org.apache.karaf.deployer/org.apache.karaf.deployer.blueprint/3.0.1 = 24 -mvn\:org.apache.karaf.deployer/org.apache.karaf.deployer.wrap/3.0.1 = 24 -mvn\:org.apache.karaf.region/org.apache.karaf.region.core/3.0.1 = 25 -mvn\:org.apache.karaf.features/org.apache.karaf.features.core/3.0.1 = 25 -mvn\:org.apache.karaf.deployer/org.apache.karaf.deployer.features/3.0.1 = 26 -mvn\:jline/jline/2.11 = 30 -mvn\:org.jledit/core/0.2.1 = 30 -mvn\:org.fusesource.jansi/jansi/1.11 = 30 -mvn\:org.ops4j.base/ops4j-base-util-property/1.4.0 = 30 -mvn\:org.ops4j.base/ops4j-base-util-xml/1.4.0 = 30 -mvn\:org.ops4j.base/ops4j-base-util-collections/1.4.0 = 30 -mvn\:org.ops4j.pax.url/pax-url-commons/1.6.0 = 30 -mvn\:org.ops4j.pax.swissbox/pax-swissbox-property/1.7.0 = 30 -mvn\:org.ops4j.base/ops4j-base-net/1.4.0 = 30 -mvn\:org.ops4j.base/ops4j-base-monitors/1.4.0 = 30 -mvn\:org.apache.karaf.features/org.apache.karaf.features.command/3.0.1 = 30 -mvn\:org.apache.karaf.shell/org.apache.karaf.shell.console/3.0.1 = 30 -mvn\:org.apache.karaf.jaas/org.apache.karaf.jaas.modules/3.0.1 = 30 -mvn\:org.apache.karaf.jaas/org.apache.karaf.jaas.config/3.0.1 = 30 -mvn\:org.apache.karaf.jaas/org.apache.karaf.jaas.boot/3.0.1 = 30 -mvn\:org.apache.sshd/sshd-core/0.9.0 = 30 -mvn\:org.apache.karaf.bundle/org.apache.karaf.bundle.command/3.0.1 = 30 -mvn\:org.apache.karaf.shell/org.apache.karaf.shell.table/3.0.1 = 30 -mvn\:org.apache.karaf.bundle/org.apache.karaf.bundle.core/3.0.1 = 30 -mvn\:org.apache.karaf.shell/org.apache.karaf.shell.help/3.0.1 = 30 -mvn\:org.apache.karaf.system/org.apache.karaf.system.core/3.0.1 = 30 -mvn\:org.apache.karaf.system/org.apache.karaf.system.command/3.0.1 = 30 -mvn\:org.apache.karaf.shell/org.apache.karaf.shell.commands/3.0.1 = 30 -mvn\:org.apache.aries.quiesce/org.apache.aries.quiesce.api/1.0.0 = 30 diff --git a/opendaylight/distribution/opendaylight-karaf/src/main/resources/etc/system.properties b/opendaylight/distribution/opendaylight-karaf/src/main/resources/etc/system.properties deleted file mode 100644 index a312d66ad5..0000000000 --- a/opendaylight/distribution/opendaylight-karaf/src/main/resources/etc/system.properties +++ /dev/null @@ -1,109 +0,0 @@ -# -# The properties defined in this file will be made available through system -# properties at the very beginning of the Karaf's boot process. -# - -# Use Equinox as default OSGi Framework Implementation -karaf.framework=equinox - -# https://bugs.eclipse.org/bugs/show_bug.cgi?id=325578 -# Extend the framework to avoid the resources to be presented with -# a URL of type bundleresource: but to be presented as file: -osgi.hook.configurators.include=org.eclipse.virgo.kernel.equinox.extensions.hooks.ExtensionsHookConfigurator - - -# Log level when the pax-logging service is not available -# This level will only be used while the pax-logging service bundle -# is not fully available. -# To change log levels, please refer to the org.ops4j.pax.logging.cfg file -# instead. -org.ops4j.pax.logging.DefaultServiceLog.level = ERROR - -# -# Name of this Karaf instance. -# -karaf.name = root - -# -# Default repository where bundles will be loaded from before using -# other Maven repositories. For the full Maven configuration, see -# the org.ops4j.pax.url.mvn.cfg file. -# -karaf.default.repository = system - -# -# Location of a shell script that will be run when starting a shell -# session. This script can be used to create aliases and define -# additional commands. -# -karaf.shell.init.script = ${karaf.etc}/shell.init.script - -# -# Sets the maximum size of the shell command history. If not set, -# defaults to 500 entries. Setting to 0 will disable history. -# -# karaf.shell.history.maxSize = 0 - -# -# Deletes the entire karaf.data directory at every start -# -karaf.clean.all = false - -# -# Deletes the karaf.data/cache directory at every start -# -karaf.clean.cache = false - -# -# Roles to use when logging into a local Karaf console. -# -# The syntax is the following: -# [classname:]principal -# where classname is the class name of the principal object -# (defaults to org.apache.karaf.jaas.modules.RolePrincipal) -# and principal is the name of the principal of that class -# (defaults to instance). -# -karaf.local.roles = admin,manager,viewer - -# -# Set this empty property to avoid errors when validating xml documents. -# -xml.catalog.files = - -# -# Suppress the bell in the console when hitting backspace too many times -# for example -# -jline.nobell = true - -# -# ServiceMix specs options -# -org.apache.servicemix.specs.debug = false -org.apache.servicemix.specs.timeout = 0 - -# -# Settings for the OSGi 4.3 Weaving -# By default, we will not weave any classes. Change this setting to include classes -# that you application needs to have woven. -# -org.apache.aries.proxy.weaving.enabled = none -# Classes not to weave - Aries default + Xerces which is known to have issues. -org.apache.aries.proxy.weaving.disabled = org.objectweb.asm.*,org.slf4j.*,org.apache.log4j.*,javax.*,org.apache.xerces.* - -# -# By default, only Karaf shell commands are secured, but additional services can be -# secured by expanding this filter -# -karaf.secured.services = (&(osgi.command.scope=*)(osgi.command.function=*)) - -# -# Security properties -# -# To enable OSGi security, uncomment the properties below, -# install the framework-security feature and restart. -# -#java.security.policy=${karaf.etc}/all.policy -#org.osgi.framework.security=osgi -#org.osgi.framework.trust.repositories=${karaf.etc}/trustStore.ks diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/AbstractBrokerAwareActivator.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/AbstractBrokerAwareActivator.java index b62e4529f3..bd78c584ee 100644 --- a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/AbstractBrokerAwareActivator.java +++ b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/AbstractBrokerAwareActivator.java @@ -39,14 +39,20 @@ public abstract class AbstractBrokerAwareActivator implements BundleActivator { @Override public void modifiedService(ServiceReference reference, BindingAwareBroker service) { - // TODO Auto-generated method stub - + removedService(reference, service); + addingService(reference); } @Override public void removedService(ServiceReference reference, BindingAwareBroker service) { - // TODO Auto-generated method stub + broker = context.getService(reference); + mdActivationPool.execute(new Runnable() { + @Override + public void run() { + onBrokerRemoved(broker, context); + } + }); } }; @@ -117,6 +123,6 @@ public abstract class AbstractBrokerAwareActivator implements BundleActivator { protected abstract void onBrokerAvailable(BindingAwareBroker broker, BundleContext context); protected void onBrokerRemoved(BindingAwareBroker broker, BundleContext context) { - + stopImpl(context); } } diff --git a/opendaylight/md-sal/sal-clustering-commons/pom.xml b/opendaylight/md-sal/sal-clustering-commons/pom.xml index 6db4d3a094..4419d19f52 100644 --- a/opendaylight/md-sal/sal-clustering-commons/pom.xml +++ b/opendaylight/md-sal/sal-clustering-commons/pom.xml @@ -64,6 +64,10 @@ org.opendaylight.yangtools yang-parser-impl + + org.opendaylight.controller + netconf-util + xmlunit @@ -156,6 +160,12 @@ jsr305 2.0.1 + + + com.codahale.metrics + metrics-core + 3.0.1 + diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java new file mode 100644 index 0000000000..646431522e --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.common.actor; + +import akka.actor.ActorPath; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.dispatch.BoundedMailbox; +import akka.dispatch.MailboxType; +import akka.dispatch.MessageQueue; +import akka.dispatch.ProducesMessageQueue; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; +import org.opendaylight.controller.common.reporting.MetricsReporter; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue { + + private MeteredMessageQueue queue; + private Integer capacity; + private FiniteDuration pushTimeOut; + private ActorPath actorPath; + private MetricsReporter reporter; + + private final String QUEUE_SIZE = "queue-size"; + private final Long DEFAULT_TIMEOUT = 10L; + + public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) { + Preconditions.checkArgument( config.hasPath("mailbox-capacity"), "Missing configuration [mailbox-capacity]" ); + this.capacity = config.getInt("mailbox-capacity"); + Preconditions.checkArgument( this.capacity > 0, "mailbox-capacity must be > 0"); + + Long timeout = -1L; + if ( config.hasPath("mailbox-push-timeout-time") ){ + timeout = config.getDuration("mailbox-push-timeout-time", TimeUnit.NANOSECONDS); + } else { + timeout = DEFAULT_TIMEOUT; + } + Preconditions.checkArgument( timeout > 0, "mailbox-push-timeout-time must be > 0"); + this.pushTimeOut = new FiniteDuration(timeout, TimeUnit.NANOSECONDS); + + reporter = MetricsReporter.getInstance(); + } + + + @Override + public MessageQueue create(final scala.Option owner, scala.Option system) { + this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut); + monitorQueueSize(owner, this.queue); + return this.queue; + } + + private void monitorQueueSize(scala.Option owner, final MeteredMessageQueue monitoredQueue) { + if (owner.isEmpty()) { + return; //there's no actor to monitor + } + actorPath = owner.get().path(); + MetricRegistry registry = reporter.getMetricsRegistry(); + + String actorName = registry.name(actorPath.toString(), QUEUE_SIZE); + + if (registry.getMetrics().containsKey(actorName)) + return; //already registered + + reporter.getMetricsRegistry().register(actorName, + new Gauge() { + @Override + public Integer getValue() { + return monitoredQueue.size(); + } + }); + } + + + public static class MeteredMessageQueue extends BoundedMailbox.MessageQueue { + + public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) { + super(capacity, pushTimeOut); + } + } + +} + diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/reporting/MetricsReporter.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/reporting/MetricsReporter.java new file mode 100644 index 0000000000..5c3e11f8b8 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/reporting/MetricsReporter.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.common.reporting; + +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricRegistry; + +/** + * Maintains metrics registry that is provided to reporters. + * At the moment only one reporter exists {@code JmxReporter}. + * More reporters can be added. + *

+ * The consumers of this class will only be interested in {@code MetricsRegistry} + * where metrics for that consumer gets stored. + */ +public class MetricsReporter implements AutoCloseable{ + + private final MetricRegistry METRICS_REGISTRY = new MetricRegistry(); + private final String DOMAIN = "org.opendaylight.controller"; + + public final JmxReporter jmxReporter = JmxReporter.forRegistry(METRICS_REGISTRY).inDomain(DOMAIN).build(); + + private static MetricsReporter inst = new MetricsReporter(); + + private MetricsReporter(){ + jmxReporter.start(); + } + + public static MetricsReporter getInstance(){ + return inst; + } + + public MetricRegistry getMetricsRegistry(){ + return METRICS_REGISTRY; + } + + @Override + public void close() throws Exception { + jmxReporter.close(); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/InstanceIdentifierForXmlCodec.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/InstanceIdentifierForXmlCodec.java similarity index 99% rename from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/InstanceIdentifierForXmlCodec.java rename to opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/InstanceIdentifierForXmlCodec.java index 92a7fbae79..a9a49142a9 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/InstanceIdentifierForXmlCodec.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/InstanceIdentifierForXmlCodec.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.remote.rpc.utils; +package org.opendaylight.controller.xml.codec; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/RandomPrefix.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/RandomPrefix.java similarity index 96% rename from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/RandomPrefix.java rename to opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/RandomPrefix.java index 55cc8192a8..1349e1ece3 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/RandomPrefix.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/RandomPrefix.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.remote.rpc.utils; +package org.opendaylight.controller.xml.codec; import org.opendaylight.yangtools.yang.common.QName; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/XmlDocumentUtils.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlDocumentUtils.java similarity index 99% rename from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/XmlDocumentUtils.java rename to opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlDocumentUtils.java index b4cca1ab48..8af6a3140b 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/XmlDocumentUtils.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlDocumentUtils.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.remote.rpc.utils; +package org.opendaylight.controller.xml.codec; import com.google.common.base.Function; import com.google.common.base.Objects; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/XmlStreamUtils.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlStreamUtils.java similarity index 99% rename from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/XmlStreamUtils.java rename to opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlStreamUtils.java index e4576c445b..c9d5e89ae1 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/XmlStreamUtils.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlStreamUtils.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.remote.rpc.utils; +package org.opendaylight.controller.xml.codec; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/XmlUtils.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java similarity index 99% rename from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/XmlUtils.java rename to opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java index e07401a3e0..5848561676 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/XmlUtils.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/xml/codec/XmlUtils.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.remote.rpc.utils; +package org.opendaylight.controller.xml.codec; import com.google.common.base.Optional; import org.opendaylight.controller.netconf.util.xml.XmlUtil; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/common/actor/MeteredBoundedMailboxTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/common/actor/MeteredBoundedMailboxTest.java new file mode 100644 index 0000000000..bfdb0930b1 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/common/actor/MeteredBoundedMailboxTest.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.common.actor; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.DeadLetter; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.japi.Creator; +import akka.testkit.JavaTestKit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +public class MeteredBoundedMailboxTest { + + private static ActorSystem actorSystem; + private final ReentrantLock lock = new ReentrantLock(); + + @Before + public void setUp() throws Exception { + actorSystem = ActorSystem.create("testsystem"); + } + + @After + public void tearDown() throws Exception { + if (actorSystem != null) + actorSystem.shutdown(); + } + + @Test + public void test_WhenQueueIsFull_ShouldSendMsgToDeadLetter() throws InterruptedException { + final JavaTestKit mockReceiver = new JavaTestKit(actorSystem); + actorSystem.eventStream().subscribe(mockReceiver.getRef(), DeadLetter.class); + + + final FiniteDuration ONE_SEC = new FiniteDuration(1, TimeUnit.SECONDS); + String boundedMailBox = actorSystem.name() + ".bounded-mailbox"; + ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(boundedMailBox), + "pingpongactor"); + + actorSystem.mailboxes().settings(); + lock.lock(); + //queue capacity = 10 + //need to send 12 messages; 1 message is dequeued and actor waits on lock, + //2nd to 11th messages are put on the queue + //12th message is sent to dead letter. + for (int i=0;i<12;i++){ + pingPongActor.tell("ping", mockReceiver.getRef()); + } + + mockReceiver.expectMsgClass(ONE_SEC, DeadLetter.class); + + lock.unlock(); + + Object[] eleven = mockReceiver.receiveN(11, ONE_SEC); + } + + /** + * For testing + */ + public static class PingPongActor extends UntypedActor{ + + ReentrantLock lock; + + private PingPongActor(ReentrantLock lock){ + this.lock = lock; + } + + public static Props props(final ReentrantLock lock){ + return Props.create(new Creator(){ + @Override + public PingPongActor create() throws Exception { + return new PingPongActor(lock); + } + }); + } + + @Override + public void onReceive(Object message) throws Exception { + lock.lock(); + if ("ping".equals(message)) + getSender().tell("pong", getSelf()); + } + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/utils/XmlUtilsTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/xml/codec/XmlUtilsTest.java similarity index 98% rename from opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/utils/XmlUtilsTest.java rename to opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/xml/codec/XmlUtilsTest.java index a408e1d55a..0688bfbc5c 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/utils/XmlUtilsTest.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/xml/codec/XmlUtilsTest.java @@ -6,7 +6,7 @@ * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.remote.rpc.utils; +package org.opendaylight.controller.xml.codec; import com.google.common.collect.ImmutableList; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf index e69de29bb2..0392dec3dd 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/application.conf @@ -0,0 +1,8 @@ +testsystem { + + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 10 + mailbox-push-timeout-time = 100ms + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/org/opendaylight/controller/remote/rpc/utils/rpcTest.yang b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/org/opendaylight/controller/xml/codec/rpcTest.yang similarity index 100% rename from opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/org/opendaylight/controller/remote/rpc/utils/rpcTest.yang rename to opendaylight/md-sal/sal-clustering-commons/src/test/resources/org/opendaylight/controller/xml/codec/rpcTest.yang diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf new file mode 100644 index 0000000000..3481bae8ae --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/reference.conf @@ -0,0 +1,8 @@ +testsystem { + + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 10ms + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index 648e8d23d0..9c5129d6a4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -135,6 +135,11 @@ 1.1-SNAPSHOT + + com.codahale.metrics + metrics-core + 3.0.1 + junit @@ -168,10 +173,11 @@ ${project.groupId}.${project.artifactId} - !*snappy;!org.jboss.*;* + !*snappy;!org.jboss.*;!com.jcraft.*;* sal-clustering-commons; sal-akka-raft; + *metrics*; !sal*; !*config-api*; !*testkit*; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java index ac01f42a7f..b258c4466a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java @@ -18,7 +18,7 @@ public abstract class AbstractUntypedActor extends UntypedActor { Logging.getLogger(getContext().system(), this); - public AbstractUntypedActor(){ + public AbstractUntypedActor() { LOG.debug("Actor created {}", getSelf()); getContext(). system(). @@ -29,16 +29,18 @@ public abstract class AbstractUntypedActor extends UntypedActor { @Override public void onReceive(Object message) throws Exception { LOG.debug("Received message {}", message.getClass().getSimpleName()); handleReceive(message); - LOG.debug("Done handling message {}", message.getClass().getSimpleName()); + LOG.debug("Done handling message {}", + message.getClass().getSimpleName()); } protected abstract void handleReceive(Object message) throws Exception; - protected void ignoreMessage(Object message){ + protected void ignoreMessage(Object message) { LOG.debug("Unhandled message {} ", message); } - protected void unknownMessage(Object message) throws Exception{ + protected void unknownMessage(Object message) throws Exception { + LOG.debug("Received unhandled message {}", message); unhandled(message); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 40e045f18e..404a4e0203 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -10,9 +10,8 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSystem; + import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; @@ -21,14 +20,13 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.util.PropertyUtils; -import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -41,38 +39,14 @@ import org.slf4j.LoggerFactory; */ public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable { - private static final Logger - LOG = LoggerFactory.getLogger(DistributedDataStore.class); - - private static final String EXECUTOR_MAX_POOL_SIZE_PROP = - "mdsal.dist-datastore-executor-pool.size"; - private static final int DEFAULT_EXECUTOR_MAX_POOL_SIZE = 10; - - private static final String EXECUTOR_MAX_QUEUE_SIZE_PROP = - "mdsal.dist-datastore-executor-queue.size"; - private static final int DEFAULT_EXECUTOR_MAX_QUEUE_SIZE = 5000; + private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); private final ActorContext actorContext; private SchemaContext schemaContext; - /** - * Executor used to run FutureTask's - * - * This is typically used when we need to make a request to an actor and - * wait for it's response and the consumer needs to be provided a Future. - */ - private final ListeningExecutorService executor = - MoreExecutors.listeningDecorator( - SpecialExecutors.newBlockingBoundedFastThreadPool( - PropertyUtils.getIntSystemProperty( - EXECUTOR_MAX_POOL_SIZE_PROP, - DEFAULT_EXECUTOR_MAX_POOL_SIZE), - PropertyUtils.getIntSystemProperty( - EXECUTOR_MAX_QUEUE_SIZE_PROP, - DEFAULT_EXECUTOR_MAX_QUEUE_SIZE), "DistDataStore")); - - public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) { + public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, + Configuration configuration, InMemoryDOMDataStoreConfigProperties dataStoreProperties) { Preconditions.checkNotNull(actorSystem, "actorSystem should not be null"); Preconditions.checkNotNull(type, "type should not be null"); Preconditions.checkNotNull(cluster, "cluster should not be null"); @@ -84,7 +58,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au LOG.info("Creating ShardManager : {}", shardManagerId); this.actorContext = new ActorContext(actorSystem, actorSystem - .actorOf(ShardManager.props(type, cluster, configuration), + .actorOf(ShardManager.props(type, cluster, configuration, dataStoreProperties), shardManagerId ), cluster, configuration); } @@ -93,15 +67,16 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au } + @SuppressWarnings("unchecked") @Override - public >> ListenerRegistration registerChangeListener( + public >> + ListenerRegistration registerChangeListener( YangInstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) { Preconditions.checkNotNull(path, "path should not be null"); Preconditions.checkNotNull(listener, "listener should not be null"); - LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope); ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf( @@ -110,10 +85,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au String shardName = ShardStrategyFactory.getStrategy(path).findShard(path); Object result = actorContext.executeLocalShardOperation(shardName, - new RegisterChangeListener(path, dataChangeListenerActor.path(), - scope), - ActorContext.ASK_DURATION - ); + new RegisterChangeListener(path, dataChangeListenerActor.path(), scope), + ActorContext.ASK_DURATION); if (result != null) { RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result; @@ -125,34 +98,31 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au LOG.debug( "No local shard for shardName {} was found so returning a noop registration", shardName); + return new NoOpDataChangeListenerRegistration(listener); } - - - - @Override public DOMStoreTransactionChain createTransactionChain() { - return new TransactionChainProxy(actorContext, executor, schemaContext); + return new TransactionChainProxy(actorContext, schemaContext); } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY, - executor, schemaContext); + schemaContext); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY, - executor, schemaContext); + schemaContext); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE, - executor, schemaContext); + schemaContext); } @Override public void onGlobalContextUpdated(SchemaContext schemaContext) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java index 6d87271f00..a1a3e87510 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java @@ -9,19 +9,22 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSystem; + import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; import org.opendaylight.controller.sal.core.api.model.SchemaService; public class DistributedDataStoreFactory { - public static DistributedDataStore createInstance(String name, SchemaService schemaService){ + public static DistributedDataStore createInstance(String name, SchemaService schemaService, + InMemoryDOMDataStoreConfigProperties dataStoreProperties) { + ActorSystem actorSystem = ActorSystemFactory.getInstance(); Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); final DistributedDataStore dataStore = - new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),config ); - ShardStrategyFactory.setConfiguration(config); - schemaService - .registerSchemaContextListener(dataStore); + new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem), + config, dataStoreProperties ); + ShardStrategyFactory.setConfiguration(config); + schemaService.registerSchemaContextListener(dataStore); return dataStore; - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 63b26331a5..75f540ade0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -17,6 +17,8 @@ import akka.japi.Creator; import akka.serialization.Serialization; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; @@ -41,6 +43,7 @@ import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -92,7 +95,8 @@ public class Shard extends RaftActor { private final List dataChangeListeners = new ArrayList<>(); - private Shard(ShardIdentifier name, Map peerAddresses) { + private Shard(ShardIdentifier name, Map peerAddresses, + InMemoryDOMDataStoreConfigProperties dataStoreProperties) { super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams)); this.name = name; @@ -103,16 +107,18 @@ public class Shard extends RaftActor { LOG.info("Shard created : {} persistent : {}", name, persistent); - store = InMemoryDOMDataStoreFactory.create(name.toString(), null); + store = InMemoryDOMDataStoreFactory.create(name.toString(), null, dataStoreProperties); shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString()); } - private static Map mapPeerAddresses(Map peerAddresses){ - Map map = new HashMap<>(); + private static Map mapPeerAddresses( + Map peerAddresses) { + Map map = new HashMap<>(); - for(Map.Entry entry : peerAddresses.entrySet()){ + for (Map.Entry entry : peerAddresses + .entrySet()) { map.put(entry.getKey().toString(), entry.getValue()); } @@ -123,15 +129,17 @@ public class Shard extends RaftActor { public static Props props(final ShardIdentifier name, - final Map peerAddresses) { + final Map peerAddresses, + final InMemoryDOMDataStoreConfigProperties dataStoreProperties) { Preconditions.checkNotNull(name, "name should not be null"); - Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); + Preconditions + .checkNotNull(peerAddresses, "peerAddresses should not be null"); return Props.create(new Creator() { @Override public Shard create() throws Exception { - return new Shard(name, peerAddresses); + return new Shard(name, peerAddresses, dataStoreProperties); } }); @@ -164,14 +172,16 @@ public class Shard extends RaftActor { } } else if (message instanceof PeerAddressResolved) { PeerAddressResolved resolved = (PeerAddressResolved) message; - setPeerAddress(resolved.getPeerId().toString(), resolved.getPeerAddress()); + setPeerAddress(resolved.getPeerId().toString(), + resolved.getPeerAddress()); } else { super.onReceiveCommand(message); } } private ActorRef createTypedTransactionActor( - CreateTransaction createTransaction, ShardTransactionIdentifier transactionId) { + CreateTransaction createTransaction, + ShardTransactionIdentifier transactionId) { if (createTransaction.getTransactionType() == TransactionProxy.TransactionType.READ_ONLY.ordinal()) { @@ -203,24 +213,26 @@ public class Shard extends RaftActor { .props(store.newWriteOnlyTransaction(), getSelf(), schemaContext), transactionId.toString()); } else { - // FIXME: This does not seem right throw new IllegalArgumentException( - "CreateTransaction message has unidentified transaction type=" + "Shard="+name + ":CreateTransaction message has unidentified transaction type=" + createTransaction.getTransactionType()); } } private void createTransaction(CreateTransaction createTransaction) { - ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder().remoteTransactionId(createTransaction.getTransactionId()).build(); + ShardTransactionIdentifier transactionId = + ShardTransactionIdentifier.builder() + .remoteTransactionId(createTransaction.getTransactionId()) + .build(); LOG.debug("Creating transaction : {} ", transactionId); ActorRef transactionActor = createTypedTransactionActor(createTransaction, transactionId); getSender() .tell(new CreateTransactionReply( - Serialization.serializedActorPath(transactionActor), - createTransaction.getTransactionId()).toSerializable(), + Serialization.serializedActorPath(transactionActor), + createTransaction.getTransactionId()).toSerializable(), getSelf()); } @@ -255,22 +267,21 @@ public class Shard extends RaftActor { final ListenableFuture future = cohort.commit(); final ActorRef self = getSelf(); - future.addListener(new Runnable() { - @Override - public void run() { - try { - future.get(); - sender - .tell(new CommitTransactionReply().toSerializable(), - self); - shardMBean.incrementCommittedTransactionCount(); - shardMBean.setLastCommittedTransactionTime(new Date()); - } catch (InterruptedException | ExecutionException e) { - shardMBean.incrementFailedTransactionsCount(); - sender.tell(new akka.actor.Status.Failure(e),self); - } + + Futures.addCallback(future, new FutureCallback() { + public void onSuccess(Void v) { + sender.tell(new CommitTransactionReply().toSerializable(),self); + shardMBean.incrementCommittedTransactionCount(); + shardMBean.setLastCommittedTransactionTime(new Date()); } - }, getContext().dispatcher()); + + public void onFailure(Throwable t) { + LOG.error(t, "An exception happened during commit"); + shardMBean.incrementFailedTransactionsCount(); + sender.tell(new akka.actor.Status.Failure(t), self); + } + }); + } private void handleForwardedCommit(ForwardedCommitTransaction message) { @@ -329,7 +340,7 @@ public class Shard extends RaftActor { LOG.debug( "registerDataChangeListener sending reply, listenerRegistrationPath = {} " - , listenerRegistration.path().toString()); + , listenerRegistration.path().toString()); getSender() .tell(new RegisterChangeListenerReply(listenerRegistration.path()), @@ -370,7 +381,7 @@ public class Shard extends RaftActor { // Update stats ReplicatedLogEntry lastLogEntry = getLastLogEntry(); - if(lastLogEntry != null){ + if (lastLogEntry != null) { shardMBean.setLastLogIndex(lastLogEntry.getIndex()); shardMBean.setLastLogTerm(lastLogEntry.getTerm()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 6162a0327c..3396eb5564 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -30,6 +30,8 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; + import scala.concurrent.duration.Duration; import java.util.ArrayList; @@ -68,15 +70,19 @@ public class ShardManager extends AbstractUntypedActor { private ShardManagerInfoMBean mBean; + private final InMemoryDOMDataStoreConfigProperties dataStoreProperties; + /** * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be * configuration or operational */ - private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) { + private ShardManager(String type, ClusterWrapper cluster, Configuration configuration, + InMemoryDOMDataStoreConfigProperties dataStoreProperties) { this.type = Preconditions.checkNotNull(type, "type should not be null"); this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); + this.dataStoreProperties = dataStoreProperties; // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -88,7 +94,8 @@ public class ShardManager extends AbstractUntypedActor { public static Props props(final String type, final ClusterWrapper cluster, - final Configuration configuration) { + final Configuration configuration, + final InMemoryDOMDataStoreConfigProperties dataStoreProperties) { Preconditions.checkNotNull(type, "type should not be null"); Preconditions.checkNotNull(cluster, "cluster should not be null"); @@ -98,7 +105,7 @@ public class ShardManager extends AbstractUntypedActor { @Override public ShardManager create() throws Exception { - return new ShardManager(type, cluster, configuration); + return new ShardManager(type, cluster, configuration, dataStoreProperties); } }); } @@ -243,7 +250,7 @@ public class ShardManager extends AbstractUntypedActor { ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); ActorRef actor = getContext() - .actorOf(Shard.props(shardId, peerAddresses), + .actorOf(Shard.props(shardId, peerAddresses, dataStoreProperties), shardId.toString()); localShardActorNames.add(shardId.toString()); localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses)); @@ -283,10 +290,12 @@ public class ShardManager extends AbstractUntypedActor { @Override public SupervisorStrategy supervisorStrategy() { + return new OneForOneStrategy(10, Duration.create("1 minute"), new Function() { @Override public SupervisorStrategy.Directive apply(Throwable t) { + LOG.warning("Supervisor Strategy of resume applied {}",t); return SupervisorStrategy.resume(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java index 97bb196f9f..49c7b7e78f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java @@ -53,7 +53,7 @@ public class ShardReadWriteTransaction extends ShardTransaction { } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) { mergeData(transaction, MergeData.fromSerializable(message, schemaContext)); } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) { - deleteData(transaction,DeleteData.fromSerizalizable(message)); + deleteData(transaction,DeleteData.fromSerializable(message)); } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) { readyTransaction(transaction,new ReadyTransaction()); } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index 91e578b46d..b01fe7d4ac 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -50,7 +50,7 @@ public class ShardWriteTransaction extends ShardTransaction { } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) { mergeData(transaction, MergeData.fromSerializable(message, schemaContext)); } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) { - deleteData(transaction,DeleteData.fromSerizalizable(message)); + deleteData(transaction,DeleteData.fromSerializable(message)); } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) { readyTransaction(transaction,new ReadyTransaction()); }else { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java index 500b73ce9d..25705bff41 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java @@ -14,6 +14,8 @@ import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; @@ -26,8 +28,6 @@ import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransacti import org.opendaylight.controller.cluster.datastore.modification.CompositeModification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import java.util.concurrent.ExecutionException; - public class ThreePhaseCommitCohort extends AbstractUntypedActor { private final DOMStoreThreePhaseCommitCohort cohort; private final ActorRef shardActor; @@ -58,13 +58,17 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor { @Override public void handleReceive(Object message) throws Exception { - if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) { + if (message.getClass() + .equals(CanCommitTransaction.SERIALIZABLE_CLASS)) { canCommit(new CanCommitTransaction()); - } else if (message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) { + } else if (message.getClass() + .equals(PreCommitTransaction.SERIALIZABLE_CLASS)) { preCommit(new PreCommitTransaction()); - } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) { + } else if (message.getClass() + .equals(CommitTransaction.SERIALIZABLE_CLASS)) { commit(new CommitTransaction()); - } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) { + } else if (message.getClass() + .equals(AbortTransaction.SERIALIZABLE_CLASS)) { abort(new AbortTransaction()); } else { unknownMessage(message); @@ -76,17 +80,19 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor { final ActorRef sender = getSender(); final ActorRef self = getSelf(); - future.addListener(new Runnable() { - @Override - public void run() { - try { - future.get(); - sender.tell(new AbortTransactionReply().toSerializable(), self); - } catch (InterruptedException | ExecutionException e) { - log.error(e, "An exception happened when aborting"); - } + Futures.addCallback(future, new FutureCallback() { + public void onSuccess(Void v) { + sender + .tell(new AbortTransactionReply().toSerializable(), + self); + } + + public void onFailure(Throwable t) { + LOG.error(t, "An exception happened during abort"); + sender + .tell(new akka.actor.Status.Failure(t), getSelf()); } - }, getContext().dispatcher()); + }); } private void commit(CommitTransaction message) { @@ -103,18 +109,19 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor { final ListenableFuture future = cohort.preCommit(); final ActorRef sender = getSender(); final ActorRef self = getSelf(); + Futures.addCallback(future, new FutureCallback() { + public void onSuccess(Void v) { + sender + .tell(new PreCommitTransactionReply().toSerializable(), + self); + } - future.addListener(new Runnable() { - @Override - public void run() { - try { - future.get(); - sender.tell(new PreCommitTransactionReply().toSerializable(), self); - } catch (InterruptedException | ExecutionException e) { - log.error(e, "An exception happened when preCommitting"); - } + public void onFailure(Throwable t) { + LOG.error(t, "An exception happened during pre-commit"); + sender + .tell(new akka.actor.Status.Failure(t), getSelf()); } - }, getContext().dispatcher()); + }); } @@ -122,18 +129,19 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor { final ListenableFuture future = cohort.canCommit(); final ActorRef sender = getSender(); final ActorRef self = getSelf(); + Futures.addCallback(future, new FutureCallback() { + public void onSuccess(Boolean canCommit) { + sender.tell(new CanCommitTransactionReply(canCommit) + .toSerializable(), self); + } - future.addListener(new Runnable() { - @Override - public void run() { - try { - Boolean canCommit = future.get(); - sender.tell(new CanCommitTransactionReply(canCommit).toSerializable(), self); - } catch (InterruptedException | ExecutionException e) { - log.error(e, "An exception happened when checking canCommit"); - } + public void onFailure(Throwable t) { + LOG.error(t, "An exception happened during canCommit"); + sender + .tell(new akka.actor.Status.Failure(t), getSelf()); } - }, getContext().dispatcher()); + }); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 5b447943ea..fc455b193e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -10,11 +10,13 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorPath; import akka.actor.ActorSelection; +import akka.dispatch.Futures; +import akka.dispatch.OnComplete; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.SettableFuture; -import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; @@ -28,124 +30,156 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; /** * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies */ -public class ThreePhaseCommitCohortProxy implements - DOMStoreThreePhaseCommitCohort{ +public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{ - private static final Logger - LOG = LoggerFactory.getLogger(DistributedDataStore.class); + private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); private final ActorContext actorContext; private final List cohortPaths; - private final ListeningExecutorService executor; private final String transactionId; - - public ThreePhaseCommitCohortProxy(ActorContext actorContext, - List cohortPaths, - String transactionId, - ListeningExecutorService executor) { - + public ThreePhaseCommitCohortProxy(ActorContext actorContext, List cohortPaths, + String transactionId) { this.actorContext = actorContext; this.cohortPaths = cohortPaths; this.transactionId = transactionId; - this.executor = executor; } - @Override public ListenableFuture canCommit() { + @Override + public ListenableFuture canCommit() { LOG.debug("txn {} canCommit", transactionId); - Callable call = new Callable() { + Future> combinedFuture = + invokeCohorts(new CanCommitTransaction().toSerializable()); + + final SettableFuture returnFuture = SettableFuture.create(); + + combinedFuture.onComplete(new OnComplete>() { @Override - public Boolean call() throws Exception { - for(ActorPath actorPath : cohortPaths){ - - Object message = new CanCommitTransaction().toSerializable(); - LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath); - - ActorSelection cohort = actorContext.actorSelection(actorPath); - - try { - Object response = - actorContext.executeRemoteOperation(cohort, - message, - ActorContext.ASK_DURATION); - - if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) { - CanCommitTransactionReply reply = - CanCommitTransactionReply.fromSerializable(response); - if (!reply.getCanCommit()) { - return false; - } + public void onComplete(Throwable failure, Iterable responses) throws Throwable { + if(failure != null) { + returnFuture.setException(failure); + return; + } + + boolean result = true; + for(Object response: responses) { + if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) { + CanCommitTransactionReply reply = + CanCommitTransactionReply.fromSerializable(response); + if (!reply.getCanCommit()) { + result = false; + break; } - } catch(RuntimeException e){ - // FIXME : Need to properly handle this - LOG.error("Unexpected Exception", e); - return false; + } else { + LOG.error("Unexpected response type {}", response.getClass()); + returnFuture.setException(new IllegalArgumentException( + String.format("Unexpected response type {}", response.getClass()))); + return; } } - return true; + returnFuture.set(Boolean.valueOf(result)); } - }; + }, actorContext.getActorSystem().dispatcher()); + + return returnFuture; + } + + private Future> invokeCohorts(Object message) { + List> futureList = Lists.newArrayListWithCapacity(cohortPaths.size()); + for(ActorPath actorPath : cohortPaths) { + + LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath); - return executor.submit(call); + ActorSelection cohort = actorContext.actorSelection(actorPath); + + futureList.add(actorContext.executeRemoteOperationAsync(cohort, message, + ActorContext.ASK_DURATION)); + } + + return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher()); } - @Override public ListenableFuture preCommit() { + @Override + public ListenableFuture preCommit() { LOG.debug("txn {} preCommit", transactionId); - return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS); + return voidOperation(new PreCommitTransaction().toSerializable(), + PreCommitTransactionReply.SERIALIZABLE_CLASS, true); } - @Override public ListenableFuture abort() { + @Override + public ListenableFuture abort() { LOG.debug("txn {} abort", transactionId); - return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS); + + // Note - we pass false for propagateException. In the front-end data broker, this method + // is called when one of the 3 phases fails with an exception. We'd rather have that + // original exception propagated to the client. If our abort fails and we propagate the + // exception then that exception will supersede and suppress the original exception. But + // it's the original exception that is the root cause and of more interest to the client. + + return voidOperation(new AbortTransaction().toSerializable(), + AbortTransactionReply.SERIALIZABLE_CLASS, false); } - @Override public ListenableFuture commit() { + @Override + public ListenableFuture commit() { LOG.debug("txn {} commit", transactionId); - return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS); + return voidOperation(new CommitTransaction().toSerializable(), + CommitTransactionReply.SERIALIZABLE_CLASS, true); } - private ListenableFuture voidOperation(final Object message, final Class expectedResponseClass){ - Callable call = new Callable() { - - @Override public Void call() throws Exception { - for(ActorPath actorPath : cohortPaths){ - ActorSelection cohort = actorContext.actorSelection(actorPath); - - LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath); - - try { - Object response = - actorContext.executeRemoteOperation(cohort, - message, - ActorContext.ASK_DURATION); - - if (response != null && !response.getClass() - .equals(expectedResponseClass)) { - throw new RuntimeException( - String.format( - "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s", - expectedResponseClass.toString(), - response.getClass().toString()) - ); + private ListenableFuture voidOperation(final Object message, + final Class expectedResponseClass, final boolean propagateException) { + + Future> combinedFuture = invokeCohorts(message); + + final SettableFuture returnFuture = SettableFuture.create(); + + combinedFuture.onComplete(new OnComplete>() { + @Override + public void onComplete(Throwable failure, Iterable responses) throws Throwable { + + Throwable exceptionToPropagate = failure; + if(exceptionToPropagate == null) { + for(Object response: responses) { + if(!response.getClass().equals(expectedResponseClass)) { + exceptionToPropagate = new IllegalArgumentException( + String.format("Unexpected response type {}", + response.getClass())); + break; } - } catch(TimeoutException e){ - LOG.error(String.format("A timeout occurred when processing operation : %s", message)); } } - return null; + + if(exceptionToPropagate != null) { + if(propagateException) { + // We don't log the exception here to avoid redundant logging since we're + // propagating to the caller in MD-SAL core who will log it. + returnFuture.setException(exceptionToPropagate); + } else { + // Since the caller doesn't want us to propagate the exception we'll also + // not log it normally. But it's usually not good to totally silence + // exceptions so we'll log it to debug level. + LOG.debug(String.format("%s failed", message.getClass().getSimpleName()), + exceptionToPropagate); + returnFuture.set(null); + } + } else { + returnFuture.set(null); + } } - }; + }, actorContext.getActorSystem().dispatcher()); - return executor.submit(call); + return returnFuture; } public List getCohortPaths() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 5e9defa5b5..c4ec760b40 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -15,39 +15,34 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import com.google.common.util.concurrent.ListeningExecutorService; - /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard */ public class TransactionChainProxy implements DOMStoreTransactionChain{ private final ActorContext actorContext; - private final ListeningExecutorService transactionExecutor; private final SchemaContext schemaContext; - public TransactionChainProxy(ActorContext actorContext, ListeningExecutorService transactionExecutor, - SchemaContext schemaContext) { + public TransactionChainProxy(ActorContext actorContext, SchemaContext schemaContext) { this.actorContext = actorContext; - this.transactionExecutor = transactionExecutor; this.schemaContext = schemaContext; } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, schemaContext); + TransactionProxy.TransactionType.READ_ONLY, schemaContext); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.WRITE_ONLY, transactionExecutor, schemaContext); + TransactionProxy.TransactionType.WRITE_ONLY, schemaContext); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_WRITE, transactionExecutor, schemaContext); + TransactionProxy.TransactionType.READ_WRITE, schemaContext); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 95862ae9d9..5b5b1296af 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -12,13 +12,14 @@ import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; +import akka.dispatch.OnComplete; + import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListeningExecutorService; -import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; -import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; +import com.google.common.util.concurrent.SettableFuture; + import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; @@ -44,11 +45,12 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; /** @@ -80,25 +82,22 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final ActorContext actorContext; private final Map remoteTransactionPaths = new HashMap<>(); private final TransactionIdentifier identifier; - private final ListeningExecutorService executor; private final SchemaContext schemaContext; + private boolean inReadyState; - public TransactionProxy( - ActorContext actorContext, - TransactionType transactionType, - ListeningExecutorService executor, - SchemaContext schemaContext - ) { + public TransactionProxy(ActorContext actorContext, TransactionType transactionType, + SchemaContext schemaContext) { this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null"); - this.executor = Preconditions.checkNotNull(executor, "executor should not be null"); this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null"); String memberName = actorContext.getCurrentMemberName(); if(memberName == null){ memberName = "UNKNOWN-MEMBER"; } - this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build(); + + this.identifier = TransactionIdentifier.builder().memberName(memberName).counter( + counter.getAndIncrement()).build(); LOG.debug("Created txn {}", identifier); @@ -108,6 +107,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public CheckedFuture>, ReadFailedException> read( final YangInstanceIdentifier path) { + Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, + "Read operation on write-only transaction is not allowed"); + LOG.debug("txn {} read {}", identifier, path); createTransactionIfMissing(actorContext, path); @@ -115,8 +117,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return transactionContext(path).readData(path); } - @Override public CheckedFuture exists( - YangInstanceIdentifier path) { + @Override + public CheckedFuture exists(YangInstanceIdentifier path) { + + Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, + "Exists operation on write-only transaction is not allowed"); + LOG.debug("txn {} exists {}", identifier, path); createTransactionIfMissing(actorContext, path); @@ -124,9 +130,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return transactionContext(path).dataExists(path); } + private void checkModificationState() { + Preconditions.checkState(transactionType != TransactionType.READ_ONLY, + "Modification operation on read-only transaction is not allowed"); + Preconditions.checkState(!inReadyState, + "Transaction is sealed - further modifications are allowed"); + } + @Override public void write(YangInstanceIdentifier path, NormalizedNode data) { + checkModificationState(); + LOG.debug("txn {} write {}", identifier, path); createTransactionIfMissing(actorContext, path); @@ -137,6 +152,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void merge(YangInstanceIdentifier path, NormalizedNode data) { + checkModificationState(); + LOG.debug("txn {} merge {}", identifier, path); createTransactionIfMissing(actorContext, path); @@ -147,6 +164,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void delete(YangInstanceIdentifier path) { + checkModificationState(); + LOG.debug("txn {} delete {}", identifier, path); createTransactionIfMissing(actorContext, path); @@ -156,25 +175,36 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public DOMStoreThreePhaseCommitCohort ready() { + + checkModificationState(); + + inReadyState = true; + List cohortPaths = new ArrayList<>(); - LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size()); + LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, + remoteTransactionPaths.size()); for(TransactionContext transactionContext : remoteTransactionPaths.values()) { - LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName()); + LOG.debug("txn {} Readying transaction for shard {}", identifier, + transactionContext.getShardName()); Object result = transactionContext.readyTransaction(); if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){ - ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result); - String resolvedCohortPath = transactionContext - .getResolvedCohortPath(reply.getCohortPath().toString()); + ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable( + actorContext.getActorSystem(),result); + String resolvedCohortPath = transactionContext.getResolvedCohortPath( + reply.getCohortPath().toString()); cohortPaths.add(actorContext.actorFor(resolvedCohortPath)); + } else { + LOG.error("Was expecting {} but got {}", ReadyTransactionReply.SERIALIZABLE_CLASS, + result.getClass()); } } - return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor); + return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString()); } @Override @@ -213,8 +243,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { Object response = actorContext.executeShardOperation(shardName, new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(), ActorContext.ASK_DURATION); - if (response.getClass() - .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { + if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { CreateTransactionReply reply = CreateTransactionReply.fromSerializable(response); @@ -229,11 +258,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { transactionActor); remoteTransactionPaths.put(shardName, transactionContext); + } else { + LOG.error("Was expecting {} but got {}", CreateTransactionReply.SERIALIZABLE_CLASS, + response.getClass()); } - } catch(TimeoutException | PrimaryNotFoundException e){ + } catch(Exception e){ LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage()); - remoteTransactionPaths.put(shardName, - new NoOpTransactionContext(shardName)); + remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e)); } } @@ -272,7 +303,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { this.actor = actor; } - @Override public String getShardName() { + @Override + public String getShardName() { return shardName; } @@ -280,96 +312,105 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return actor; } - @Override public String getResolvedCohortPath(String cohortPath) { + @Override + public String getResolvedCohortPath(String cohortPath) { return actorContext.resolvePath(actorPath, cohortPath); } - @Override public void closeTransaction() { - getActor().tell( - new CloseTransaction().toSerializable(), null); + @Override + public void closeTransaction() { + actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable()); } - @Override public Object readyTransaction() { + @Override + public Object readyTransaction() { return actorContext.executeRemoteOperation(getActor(), - new ReadyTransaction().toSerializable(), - ActorContext.ASK_DURATION - ); - + new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION); } - @Override public void deleteData(YangInstanceIdentifier path) { - getActor().tell(new DeleteData(path).toSerializable(), null); + @Override + public void deleteData(YangInstanceIdentifier path) { + actorContext.sendRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() ); } - @Override public void mergeData(YangInstanceIdentifier path, - NormalizedNode data) { - getActor() - .tell(new MergeData(path, data, schemaContext).toSerializable(), - null); + @Override + public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { + actorContext.sendRemoteOperationAsync(getActor(), + new MergeData(path, data, schemaContext).toSerializable()); } @Override public CheckedFuture>, ReadFailedException> readData( final YangInstanceIdentifier path) { - Callable>> call = - new Callable>>() { - - @Override public Optional> call() - throws Exception { - Object response = actorContext - .executeRemoteOperation(getActor(), - new ReadData(path).toSerializable(), - ActorContext.ASK_DURATION); - if (response.getClass() - .equals(ReadDataReply.SERIALIZABLE_CLASS)) { - ReadDataReply reply = ReadDataReply - .fromSerializable(schemaContext, path, - response); + final SettableFuture>> returnFuture = SettableFuture.create(); + + OnComplete onComplete = new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) throws Throwable { + if(failure != null) { + returnFuture.setException(new ReadFailedException( + "Error reading data for path " + path, failure)); + } else { + if (response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { + ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, + path, response); if (reply.getNormalizedNode() == null) { - return Optional.absent(); + returnFuture.set(Optional.>absent()); + } else { + returnFuture.set(Optional.>of( + reply.getNormalizedNode())); } - return Optional.>of( - reply.getNormalizedNode()); + } else { + returnFuture.setException(new ReadFailedException( + "Invalid response reading data for path " + path)); } - - throw new ReadFailedException("Read Failed " + path); } - }; + } + }; - return MappingCheckedFuture - .create(executor.submit(call), ReadFailedException.MAPPER); - } + Future future = actorContext.executeRemoteOperationAsync(getActor(), + new ReadData(path).toSerializable(), ActorContext.ASK_DURATION); + future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); - @Override public void writeData(YangInstanceIdentifier path, - NormalizedNode data) { - getActor() - .tell(new WriteData(path, data, schemaContext).toSerializable(), - null); + return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER); } - @Override public CheckedFuture dataExists( - final YangInstanceIdentifier path) { - - Callable call = new Callable() { - - @Override public Boolean call() throws Exception { - Object o = actorContext.executeRemoteOperation(getActor(), - new DataExists(path).toSerializable(), - ActorContext.ASK_DURATION - ); - + @Override + public void writeData(YangInstanceIdentifier path, NormalizedNode data) { + actorContext.sendRemoteOperationAsync(getActor(), + new WriteData(path, data, schemaContext).toSerializable()); + } - if (DataExistsReply.SERIALIZABLE_CLASS - .equals(o.getClass())) { - return DataExistsReply.fromSerializable(o).exists(); + @Override + public CheckedFuture dataExists( + final YangInstanceIdentifier path) { + + final SettableFuture returnFuture = SettableFuture.create(); + + OnComplete onComplete = new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) throws Throwable { + if(failure != null) { + returnFuture.setException(new ReadFailedException( + "Error checking exists for path " + path, failure)); + } else { + if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) { + returnFuture.set(Boolean.valueOf(DataExistsReply. + fromSerializable(response).exists())); + } else { + returnFuture.setException(new ReadFailedException( + "Invalid response checking exists for path " + path)); + } } - - throw new ReadFailedException("Exists Failed " + path); } }; - return MappingCheckedFuture - .create(executor.submit(call), ReadFailedException.MAPPER); + + Future future = actorContext.executeRemoteOperationAsync(getActor(), + new DataExists(path).toSerializable(), ActorContext.ASK_DURATION); + future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + + return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER); } } @@ -379,22 +420,28 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG = LoggerFactory.getLogger(NoOpTransactionContext.class); private final String shardName; + private final Exception failure; private ActorRef cohort; - public NoOpTransactionContext(String shardName){ + public NoOpTransactionContext(String shardName, Exception failure){ this.shardName = shardName; + this.failure = failure; } - @Override public String getShardName() { + + @Override + public String getShardName() { return shardName; } - @Override public String getResolvedCohortPath(String cohortPath) { + @Override + public String getResolvedCohortPath(String cohortPath) { return cohort.path().toString(); } - @Override public void closeTransaction() { + @Override + public void closeTransaction() { LOG.warn("txn {} closeTransaction called", identifier); } @@ -404,11 +451,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return new ReadyTransactionReply(cohort.path()).toSerializable(); } - @Override public void deleteData(YangInstanceIdentifier path) { + @Override + public void deleteData(YangInstanceIdentifier path) { LOG.warn("txt {} deleteData called path = {}", identifier, path); } - @Override public void mergeData(YangInstanceIdentifier path, + @Override + public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.warn("txn {} mergeData called path = {}", identifier, path); } @@ -417,8 +466,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public CheckedFuture>, ReadFailedException> readData( YangInstanceIdentifier path) { LOG.warn("txn {} readData called path = {}", identifier, path); - return Futures.immediateCheckedFuture( - Optional.>absent()); + return Futures.immediateFailedCheckedFuture(new ReadFailedException( + "Error reading data for path " + path, failure)); } @Override public void writeData(YangInstanceIdentifier path, @@ -429,10 +478,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public CheckedFuture dataExists( YangInstanceIdentifier path) { LOG.warn("txn {} dataExists called path = {}", identifier, path); - - // Returning false instead of an exception to keep this aligned with - // read - return Futures.immediateCheckedFuture(false); + return Futures.immediateFailedCheckedFuture(new ReadFailedException( + "Error checking exists for path " + path, failure)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java index 17861a5a68..9ae851e76c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java @@ -31,7 +31,7 @@ public class DeleteData implements SerializableMessage { .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(path)).build(); } - public static DeleteData fromSerizalizable(Object serializable){ + public static DeleteData fromSerializable(Object serializable){ ShardTransactionMessages.DeleteData o = (ShardTransactionMessages.DeleteData) serializable; return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments())); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 4706c66e25..e12a9663d1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -14,6 +14,7 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.util.Timeout; + import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; @@ -22,9 +23,9 @@ import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -54,8 +55,6 @@ public class ActorContext { private final ClusterWrapper clusterWrapper; private final Configuration configuration; - private SchemaContext schemaContext = null; - public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { @@ -174,6 +173,33 @@ public class ActorContext { } } + /** + * Execute an operation on a remote actor asynchronously. + * + * @param actor the ActorSelection + * @param message the message to send + * @param duration the maximum amount of time to send he message + * @return a Future containing the eventual result + */ + public Future executeRemoteOperationAsync(ActorSelection actor, Object message, + FiniteDuration duration) { + + LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString()); + + return ask(actor, message, new Timeout(duration)); + } + + /** + * Sends an operation to be executed by a remote actor asynchronously without waiting for a + * reply (essentially set and forget). + * + * @param actor the ActorSelection + * @param message the message to send + */ + public void sendRemoteOperationAsync(ActorSelection actor, Object message) { + actor.tell(message, ActorRef.noSender()); + } + /** * Execute an operation on the primary for a given shard *

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java index 87a621f9d3..592bc49d9e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java @@ -1,6 +1,7 @@ package org.opendaylight.controller.config.yang.config.distributed_datastore_provider; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; public class DistributedConfigDataStoreProviderModule extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule { @@ -25,8 +26,10 @@ public class DistributedConfigDataStoreProviderModule extends @Override public java.lang.AutoCloseable createInstance() { - return DistributedDataStoreFactory - .createInstance("config", getConfigSchemaServiceDependency()); + return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(), + InMemoryDOMDataStoreConfigProperties.create(getMaxShardDataChangeExecutorPoolSize(), + getMaxShardDataChangeExecutorQueueSize(), + getMaxShardDataChangeListenerQueueSize())); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java index 6af2748a8f..9eb72d64d0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java @@ -1,6 +1,7 @@ package org.opendaylight.controller.config.yang.config.distributed_datastore_provider; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; public class DistributedOperationalDataStoreProviderModule extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule { @@ -25,8 +26,11 @@ public class DistributedOperationalDataStoreProviderModule extends @Override public java.lang.AutoCloseable createInstance() { - return DistributedDataStoreFactory - .createInstance("operational", getOperationalSchemaServiceDependency()); + return DistributedDataStoreFactory.createInstance("operational", + getOperationalSchemaServiceDependency(), + InMemoryDOMDataStoreConfigProperties.create(getMaxShardDataChangeExecutorPoolSize(), + getMaxShardDataChangeExecutorQueueSize(), + getMaxShardDataChangeListenerQueueSize())); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index 6f355cbe63..ecb823e624 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -41,28 +41,64 @@ module distributed-datastore-provider { case distributed-config-datastore-provider { when "/config:modules/config:module/config:type = 'distributed-config-datastore-provider'"; container config-schema-service { - uses config:service-ref { - refine type { - mandatory false; - config:required-identity sal:schema-service; - } - } - } + uses config:service-ref { + refine type { + mandatory false; + config:required-identity sal:schema-service; + } + } + } + + leaf max-shard-data-change-executor-queue-size { + default 1000; + type uint16; + description "The maximum queue size for each shard's data store data change notification executor."; + } + + leaf max-shard-data-change-executor-pool-size { + default 20; + type uint16; + description "The maximum thread pool size for each shard's data store data change notification executor."; + } + + leaf max-shard-data-change-listener-queue-size { + default 1000; + type uint16; + description "The maximum queue size for each shard's data store data change listeners."; + } } } // Augments the 'configuration' choice node under modules/module. - augment "/config:modules/config:module/config:configuration" { - case distributed-operational-datastore-provider { - when "/config:modules/config:module/config:type = 'distributed-operational-datastore-provider'"; + augment "/config:modules/config:module/config:configuration" { + case distributed-operational-datastore-provider { + when "/config:modules/config:module/config:type = 'distributed-operational-datastore-provider'"; container operational-schema-service { - uses config:service-ref { - refine type { - mandatory false; - config:required-identity sal:schema-service; - } - } - } + uses config:service-ref { + refine type { + mandatory false; + config:required-identity sal:schema-service; + } + } + } + + leaf max-shard-data-change-executor-queue-size { + default 1000; + type uint16; + description "The maximum queue size for each shard's data store data change notification executor."; + } + + leaf max-shard-data-change-executor-pool-size { + default 20; + type uint16; + description "The maximum thread pool size for each shard's data store data change notification executor."; + } + + leaf max-shard-data-change-listener-queue-size { + default 1000; + type uint16; + description "The maximum queue size for each shard's data store data change listeners."; + } } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java index 319451f8f0..036b00a4c9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java @@ -58,7 +58,7 @@ public class BasicIntegrationTest extends AbstractActorTest { ShardIdentifier.builder().memberName("member-1") .shardName("inventory").type("config").build(); - final Props props = Shard.props(identifier, Collections.EMPTY_MAP); + final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null); final ActorRef shard = getSystem().actorOf(props); new Within(duration("5 seconds")) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index fc527b6bff..49408b7410 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -72,7 +72,7 @@ public class DistributedDataStoreIntegrationTest { protected void run() { try { final DistributedDataStore distributedDataStore = - new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration); + new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration, null); distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext()); @@ -154,7 +154,7 @@ public class DistributedDataStoreIntegrationTest { try { final DistributedDataStore distributedDataStore = new DistributedDataStore(getSystem(), "config", - new MockClusterWrapper(), configuration); + new MockClusterWrapper(), configuration, null); distributedDataStore.onGlobalContextUpdated( SchemaContextHelper.full()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java index 406f0ffd9e..69590e62fb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java @@ -68,7 +68,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{ ActorSystem actorSystem = mock(ActorSystem.class); new DistributedDataStore(actorSystem, "config", - mock(ClusterWrapper.class), mock(Configuration.class)); + mock(ClusterWrapper.class), mock(Configuration.class), null); verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config")); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index e9ad450ed8..499b4e1f31 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -42,7 +42,7 @@ public class ShardManagerTest { new JavaTestKit(system) {{ final Props props = ShardManager .props("config", new MockClusterWrapper(), - new MockConfiguration()); + new MockConfiguration(), null); final TestActorRef subject = TestActorRef.create(system, props); @@ -66,7 +66,7 @@ public class ShardManagerTest { new JavaTestKit(system) {{ final Props props = ShardManager .props("config", new MockClusterWrapper(), - new MockConfiguration()); + new MockConfiguration(), null); final TestActorRef subject = TestActorRef.create(system, props); @@ -89,7 +89,7 @@ public class ShardManagerTest { new JavaTestKit(system) {{ final Props props = ShardManager .props("config", new MockClusterWrapper(), - new MockConfiguration()); + new MockConfiguration(), null); final TestActorRef subject = TestActorRef.create(system, props); @@ -124,7 +124,7 @@ public class ShardManagerTest { new JavaTestKit(system) {{ final Props props = ShardManager .props("config", mockClusterWrapper, - new MockConfiguration()); + new MockConfiguration(), null); final TestActorRef subject = TestActorRef.create(system, props); @@ -158,7 +158,7 @@ public class ShardManagerTest { new JavaTestKit(system) {{ final Props props = ShardManager .props("config", new MockClusterWrapper(), - new MockConfiguration()); + new MockConfiguration(), null); final TestActorRef subject = TestActorRef.create(system, props); @@ -196,7 +196,7 @@ public class ShardManagerTest { new JavaTestKit(system) {{ final Props props = ShardManager .props("config", new MockClusterWrapper(), - new MockConfiguration()); + new MockConfiguration(), null); final TestActorRef subject = TestActorRef.create(system, props); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 0d86ffb844..7740b8e667 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -40,7 +40,7 @@ public class ShardTest extends AbstractActorTest { ShardIdentifier.builder().memberName("member-1") .shardName("inventory").type("config").build(); - final Props props = Shard.props(identifier, Collections.EMPTY_MAP); + final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null); final ActorRef subject = getSystem().actorOf(props, "testCreateTransactionChain"); @@ -96,7 +96,7 @@ public class ShardTest extends AbstractActorTest { ShardIdentifier.builder().memberName("member-1") .shardName("inventory").type("config").build(); - final Props props = Shard.props(identifier, Collections.EMPTY_MAP); + final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null); final ActorRef subject = getSystem().actorOf(props, "testRegisterChangeListener"); @@ -154,7 +154,7 @@ public class ShardTest extends AbstractActorTest { ShardIdentifier.builder().memberName("member-1") .shardName("inventory").type("config").build(); - final Props props = Shard.props(identifier, Collections.EMPTY_MAP); + final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null); final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction"); @@ -216,7 +216,7 @@ public class ShardTest extends AbstractActorTest { .shardName("inventory").type("config").build(); peerAddresses.put(identifier, null); - final Props props = Shard.props(identifier, peerAddresses); + final Props props = Shard.props(identifier, peerAddresses, null); final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index 02ceee82e0..16b73040a5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -33,6 +33,7 @@ import static org.junit.Assert.assertTrue; /** * Covers negative test cases + * * @author Basheeruddin Ahmed */ public class ShardTransactionFailureTest extends AbstractActorTest { @@ -48,7 +49,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { private static final ShardIdentifier SHARD_IDENTIFIER = ShardIdentifier.builder().memberName("member-1") - .shardName("inventory").type("config").build(); + .shardName("inventory").type("operational").build(); static { store.onGlobalContextUpdated(testSchemaContext); @@ -60,7 +61,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { throws Throwable { final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, TestModel.createTestContext()); @@ -95,7 +96,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { throws Throwable { final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); @@ -129,7 +130,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { throws Throwable { final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); @@ -164,7 +165,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, TestModel.createTestContext()); @@ -203,7 +204,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); @@ -241,7 +242,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); @@ -279,7 +280,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); @@ -308,6 +309,4 @@ public class ShardTransactionFailureTest extends AbstractActorTest { } - - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 78895b2366..8f5d0c28d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -62,7 +62,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext); final ActorRef subject = getSystem().actorOf(props, "testReadData"); @@ -104,7 +104,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadDataWhenDataNotFound() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard, testSchemaContext); final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound"); @@ -147,7 +147,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveDataExistsPositive() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext); final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive"); @@ -189,7 +189,8 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveDataExistsNegative() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, + Collections.EMPTY_MAP, null)); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext); final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative"); @@ -266,7 +267,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveWriteData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, TestModel.createTestContext()); final ActorRef subject = @@ -306,7 +307,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveMergeData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext); final ActorRef subject = @@ -347,7 +348,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveDeleteData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard, TestModel.createTestContext()); final ActorRef subject = @@ -386,7 +387,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadyTransaction() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, TestModel.createTestContext()); final ActorRef subject = @@ -424,7 +425,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveCloseTransaction() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); final ActorRef subject = @@ -479,7 +480,7 @@ public class ShardTransactionTest extends AbstractActorTest { public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception { try { - final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, TestModel.createTestContext()); final TestActorRef subject = TestActorRef.apply(props,getSystem()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java new file mode 100644 index 0000000000..870889b350 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java @@ -0,0 +1,232 @@ +/* + * + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + * + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.TestActorRef; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.junit.Test; +import org.mockito.Mockito; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction; +import org.opendaylight.controller.cluster.datastore.modification.CompositeModification; +import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; +import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + + +public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest { + + private static ListeningExecutorService storeExecutor = + MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor()); + + private static final InMemoryDOMDataStore store = + new InMemoryDOMDataStore("OPER", storeExecutor, + MoreExecutors.sameThreadExecutor()); + + private static final SchemaContext testSchemaContext = + TestModel.createTestContext(); + + private static final ShardIdentifier SHARD_IDENTIFIER = + ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); + + static { + store.onGlobalContextUpdated(testSchemaContext); + } + + private FiniteDuration ASK_RESULT_DURATION = Duration.create(3000, TimeUnit.MILLISECONDS); + + + @Test(expected = TestException.class) + public void testNegativeAbortResultsInException() throws Exception { + + final ActorRef shard = + getSystem() + .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); + final DOMStoreThreePhaseCommitCohort mockCohort = Mockito + .mock(DOMStoreThreePhaseCommitCohort.class); + final CompositeModification mockComposite = + Mockito.mock(CompositeModification.class); + final Props props = + ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite); + + final TestActorRef subject = TestActorRef + .create(getSystem(), props, + "testNegativeAbortResultsInException"); + + when(mockCohort.abort()).thenReturn( + Futures.immediateFailedFuture(new TestException())); + + Future future = + akka.pattern.Patterns.ask(subject, + ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder() + .build(), 3000); + assertTrue(future.isCompleted()); + + Await.result(future, ASK_RESULT_DURATION); + + + + } + + + @Test(expected = OptimisticLockFailedException.class) + public void testNegativeCanCommitResultsInException() throws Exception { + + final ActorRef shard = + getSystem() + .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); + final DOMStoreThreePhaseCommitCohort mockCohort = Mockito + .mock(DOMStoreThreePhaseCommitCohort.class); + final CompositeModification mockComposite = + Mockito.mock(CompositeModification.class); + final Props props = + ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite); + + final TestActorRef subject = TestActorRef + .create(getSystem(), props, + "testNegativeCanCommitResultsInException"); + + when(mockCohort.canCommit()).thenReturn( + Futures + .immediateFailedFuture( + new OptimisticLockFailedException("some exception"))); + + Future future = + akka.pattern.Patterns.ask(subject, + ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder() + .build(), 3000); + + + Await.result(future, ASK_RESULT_DURATION); + + } + + + @Test(expected = TestException.class) + public void testNegativePreCommitResultsInException() throws Exception { + + final ActorRef shard = + getSystem() + .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); + final DOMStoreThreePhaseCommitCohort mockCohort = Mockito + .mock(DOMStoreThreePhaseCommitCohort.class); + final CompositeModification mockComposite = + Mockito.mock(CompositeModification.class); + final Props props = + ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite); + + final TestActorRef subject = TestActorRef + .create(getSystem(), props, + "testNegativePreCommitResultsInException"); + + when(mockCohort.preCommit()).thenReturn( + Futures + .immediateFailedFuture( + new TestException())); + + Future future = + akka.pattern.Patterns.ask(subject, + ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder() + .build(), 3000); + + Await.result(future, ASK_RESULT_DURATION); + + } + + @Test(expected = TestException.class) + public void testNegativeCommitResultsInException() throws Exception { + + final TestActorRef subject = TestActorRef + .create(getSystem(), + Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null), + "testNegativeCommitResultsInException"); + + final ActorRef shardTransaction = + getSystem().actorOf( + ShardTransaction.props(store.newReadWriteTransaction(), subject, + TestModel.createTestContext())); + + ShardTransactionMessages.WriteData writeData = + ShardTransactionMessages.WriteData.newBuilder() + .setInstanceIdentifierPathArguments( + NormalizedNodeMessages.InstanceIdentifier.newBuilder() + .build()).setNormalizedNode( + NormalizedNodeMessages.Node.newBuilder().build() + + ).build(); + + //This is done so that Modification list is updated which is used during commit + Future future = + akka.pattern.Patterns.ask(shardTransaction, writeData, 3000); + + //ready transaction creates the cohort so that we get into the + //block where in commmit is done + ShardTransactionMessages.ReadyTransaction readyTransaction = + ShardTransactionMessages.ReadyTransaction.newBuilder().build(); + + future = + akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000); + + //but when the message is sent it will have the MockCommit object + //so that we can simulate throwing of exception + ForwardedCommitTransaction mockForwardCommitTransaction = + Mockito.mock(ForwardedCommitTransaction.class); + DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction = + Mockito.mock(DOMStoreThreePhaseCommitCohort.class); + when(mockForwardCommitTransaction.getCohort()) + .thenReturn(mockThreePhaseCommitTransaction); + when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures + .immediateFailedFuture( + new TestException())); + Modification mockModification = Mockito.mock( + Modification.class); + when(mockForwardCommitTransaction.getModification()) + .thenReturn(mockModification); + + when(mockModification.toSerializable()).thenReturn( + PersistentMessages.CompositeModification.newBuilder().build()); + + future = + akka.pattern.Patterns.ask(subject, + mockForwardCommitTransaction + , 3000); + Await.result(future, ASK_RESULT_DURATION); + + + } + + private class TestException extends Exception { + } + + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 4eca5671f6..87231f0884 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -1,96 +1,245 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorRef; +import akka.actor.ActorPath; +import akka.actor.ActorSelection; import akka.actor.Props; +import akka.dispatch.Futures; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - -import junit.framework.Assert; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; -import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.stubbing.Stubber; +import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; -import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; +import scala.concurrent.duration.FiniteDuration; -import java.util.Arrays; -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertNotNull; +import java.util.List; +import java.util.concurrent.ExecutionException; public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { - private ThreePhaseCommitCohortProxy proxy; - private Props props; - private ActorRef actorRef; - private MockActorContext actorContext; - private final ListeningExecutorService executor = MoreExecutors.listeningDecorator( - Executors.newSingleThreadExecutor()); + @Mock + private ActorContext actorContext; @Before - public void setUp(){ - props = Props.create(MessageCollectorActor.class); - actorRef = getSystem().actorOf(props); - actorContext = new MockActorContext(this.getSystem()); + public void setUp() { + MockitoAnnotations.initMocks(this); - proxy = - new ThreePhaseCommitCohortProxy(actorContext, - Arrays.asList(actorRef.path()), "txn-1", executor); + doReturn(getSystem()).when(actorContext).getActorSystem(); + } + private ThreePhaseCommitCohortProxy setupProxy(int nCohorts) { + List cohorts = Lists.newArrayList(); + for(int i = 1; i <= nCohorts; i++) { + ActorPath path = getSystem().actorOf(Props.create(MessageCollectorActor.class)).path(); + cohorts.add(path); + doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path); + } + + return new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1"); } - @After - public void tearDown() { - executor.shutdownNow(); + private void setupMockActorContext(Class requestType, Object... responses) { + Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures + .failed((Throwable) responses[0]) : Futures + .successful(((SerializableMessage) responses[0]).toSerializable())); + + for(int i = 1; i < responses.length; i++) { + stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures + .failed((Throwable) responses[i]) : Futures + .successful(((SerializableMessage) responses[i]).toSerializable())); + } + + stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class), + isA(requestType), any(FiniteDuration.class)); + } + + private void verifyCohortInvocations(int nCohorts, Class requestType) { + verify(actorContext, times(nCohorts)).executeRemoteOperationAsync( + any(ActorSelection.class), isA(requestType), any(FiniteDuration.class)); + } + + @Test + public void testCanCommitWithOneCohort() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(1); + + setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, + new CanCommitTransactionReply(true)); + + ListenableFuture future = proxy.canCommit(); + + assertEquals("canCommit", true, future.get()); + + setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, + new CanCommitTransactionReply(false)); + + future = proxy.canCommit(); + + assertEquals("canCommit", false, future.get()); + + verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); } @Test - public void testCanCommit() throws Exception { - actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true).toSerializable()); + public void testCanCommitWithMultipleCohorts() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(2); + + setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, + new CanCommitTransactionReply(true), new CanCommitTransactionReply(true)); ListenableFuture future = proxy.canCommit(); - Assert.assertTrue(future.get().booleanValue()); + assertEquals("canCommit", true, future.get()); + verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS); + } + + @Test + public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(3); + + setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, + new CanCommitTransactionReply(true), new CanCommitTransactionReply(false), + new CanCommitTransactionReply(true)); + + ListenableFuture future = proxy.canCommit(); + + assertEquals("canCommit", false, future.get()); + + verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS); + } + + @Test(expected = ExecutionException.class) + public void testCanCommitWithExceptionFailure() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(1); + + setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock")); + + proxy.canCommit().get(); + } + + @Test(expected = ExecutionException.class) + public void testCanCommitWithInvalidResponseType() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(1); + + setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, + new PreCommitTransactionReply()); + + proxy.canCommit().get(); } @Test public void testPreCommit() throws Exception { - actorContext.setExecuteRemoteOperationResponse(new PreCommitTransactionReply().toSerializable()); + ThreePhaseCommitCohortProxy proxy = setupProxy(1); - ListenableFuture future = proxy.preCommit(); + setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS, + new PreCommitTransactionReply()); - future.get(); + proxy.preCommit().get(); + verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS); + } + + @Test(expected = ExecutionException.class) + public void testPreCommitWithFailure() throws Exception { + ThreePhaseCommitCohortProxy proxy = setupProxy(2); + + setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS, + new PreCommitTransactionReply(), new RuntimeException("mock")); + + proxy.preCommit().get(); } @Test public void testAbort() throws Exception { - actorContext.setExecuteRemoteOperationResponse(new AbortTransactionReply().toSerializable()); + ThreePhaseCommitCohortProxy proxy = setupProxy(1); - ListenableFuture future = proxy.abort(); + setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply()); - future.get(); + proxy.abort().get(); + verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS); + } + + @Test + public void testAbortWithFailure() throws Exception { + ThreePhaseCommitCohortProxy proxy = setupProxy(1); + + setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock")); + + // The exception should not get propagated. + proxy.abort().get(); + + verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS); } @Test public void testCommit() throws Exception { - actorContext.setExecuteRemoteOperationResponse(new CommitTransactionReply().toSerializable()); - ListenableFuture future = proxy.commit(); + ThreePhaseCommitCohortProxy proxy = setupProxy(2); + + setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(), + new CommitTransactionReply()); + + proxy.commit().get(); + + verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS); + } + + @Test(expected = ExecutionException.class) + public void testCommitWithFailure() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(2); - future.get(); + setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(), + new RuntimeException("mock")); + + proxy.commit().get(); + } + + @Test(expected = ExecutionException.class) + public void teseCommitWithInvalidResponseType() throws Exception { + + ThreePhaseCommitCohortProxy proxy = setupProxy(1); + + setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply()); + + proxy.commit().get(); } @Test - public void testGetCohortPaths() throws Exception { - assertNotNull(proxy.getCohortPaths()); + public void testGetCohortPaths() { + + ThreePhaseCommitCohortProxy proxy = setupProxy(2); + + List paths = proxy.getCohortPaths(); + assertNotNull("getCohortPaths returned null", paths); + assertEquals("getCohortPaths size", 2, paths.size()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 62052f38ab..14696f786e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -1,32 +1,44 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import akka.actor.ActorPath; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.Props; +import akka.dispatch.Futures; import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import junit.framework.Assert; -import org.junit.After; + import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; + +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; +import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; -import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; -import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; @@ -34,377 +46,433 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -import java.util.List; -import java.util.concurrent.Executors; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; -import static junit.framework.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.isA; + +@SuppressWarnings("resource") public class TransactionProxyTest extends AbstractActorTest { + @SuppressWarnings("serial") + static class TestException extends RuntimeException { + } + + static interface Invoker { + void invoke(TransactionProxy proxy) throws Exception; + } + private final Configuration configuration = new MockConfiguration(); - private final ActorContext testContext = - new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration ); + @Mock + private ActorContext mockActorContext; - private final ListeningExecutorService transactionExecutor = - MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + private SchemaContext schemaContext; + + String memberName = "mock-member"; @Before public void setUp(){ - ShardStrategyFactory.setConfiguration(configuration); - } + MockitoAnnotations.initMocks(this); - @After - public void tearDown() { - transactionExecutor.shutdownNow(); - } + schemaContext = TestModel.createTestContext(); - @Test - public void testRead() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + doReturn(getSystem()).when(mockActorContext).getActorSystem(); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + ShardStrategyFactory.setConfiguration(configuration); + } + private CreateTransaction eqCreateTransaction(final String memberName, + final TransactionType type) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + CreateTransaction obj = CreateTransaction.fromSerializable(argument); + return obj.getTransactionId().startsWith(memberName) && + obj.getTransactionType() == type.ordinal(); + } + }; + + return argThat(matcher); + } - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + private DataExists eqDataExists() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + DataExists obj = DataExists.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH); + } + }; + return argThat(matcher); + } - actorContext.setExecuteRemoteOperationResponse( - new ReadDataReply(TestModel.createTestContext(), null) - .toSerializable()); + private ReadData eqReadData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + ReadData obj = ReadData.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH); + } + }; - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); + return argThat(matcher); + } - Optional> normalizedNodeOptional = read.get(); + private WriteData eqWriteData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + WriteData obj = WriteData.fromSerializable(argument, schemaContext); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + }; + + return argThat(matcher); + } - Assert.assertFalse(normalizedNodeOptional.isPresent()); + private MergeData eqMergeData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + MergeData obj = MergeData.fromSerializable(argument, schemaContext); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + }; + + return argThat(matcher); + } - actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( - TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable()); + private DeleteData eqDeleteData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + DeleteData obj = DeleteData.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH); + } + }; - read = transactionProxy.read(TestModel.TEST_PATH); + return argThat(matcher); + } - normalizedNodeOptional = read.get(); + private Object readyTxReply(ActorPath path) { + return new ReadyTransactionReply(path).toSerializable(); + } - Assert.assertTrue(normalizedNodeOptional.isPresent()); + private Future readDataReply(NormalizedNode data) { + return Futures.successful(new ReadDataReply(schemaContext, data) + .toSerializable()); } - @Test - public void testExists() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + private Future dataExistsReply(boolean exists) { + return Futures.successful(new DataExistsReply(exists).toSerializable()); + } - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + private ActorSelection actorSelection(ActorRef actorRef) { + return getSystem().actorSelection(actorRef.path()); + } + private FiniteDuration anyDuration() { + return any(FiniteDuration.class); + } - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + private CreateTransactionReply createTransactionReply(ActorRef actorRef){ + return CreateTransactionReply.newBuilder() + .setTransactionActorPath(actorRef.path().toString()) + .setTransactionId("txn-1").build(); + } + private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) { + ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); + doReturn(getSystem().actorSelection(actorRef.path())). + when(mockActorContext).actorSelection(actorRef.path().toString()); + doReturn(memberName).when(mockActorContext).getCurrentMemberName(); + doReturn(createTransactionReply(actorRef)).when(mockActorContext). + executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD), + eqCreateTransaction(memberName, type), anyDuration()); + doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath( + anyString(), eq(actorRef.path().toString())); + doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString()); + + return actorRef; + } - actorContext.setExecuteRemoteOperationResponse(new DataExistsReply(false).toSerializable()); + @Test + public void testRead() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); - CheckedFuture exists = - transactionProxy.exists(TestModel.TEST_PATH); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - Assert.assertFalse(exists.checkedGet()); + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - actorContext.setExecuteRemoteOperationResponse(new DataExistsReply(true).toSerializable()); + Optional> readOptional = transactionProxy.read( + TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - exists = transactionProxy.exists(TestModel.TEST_PATH); + assertEquals("NormalizedNode isPresent", false, readOptional.isPresent()); - Assert.assertTrue(exists.checkedGet()); + NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - actorContext.setExecuteRemoteOperationResponse("bad message"); + doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - exists = transactionProxy.exists(TestModel.TEST_PATH); + readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - try { - exists.checkedGet(); - fail(); - } catch(ReadFailedException e){ - } + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); } @Test(expected = ReadFailedException.class) public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); - - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); - - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + setupActorContextWithInitialCreateTransaction(READ_ONLY); + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - CheckedFuture>, ReadFailedException> - read = transactionProxy.read(TestModel.TEST_PATH); - - read.checkedGet(); + transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); } - @Test - public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception { - final ActorContext actorContext = mock(ActorContext.class); - - when(actorContext.executeShardOperation(anyString(), any(), any( - FiniteDuration.class))).thenThrow(new PrimaryNotFoundException("test")); + @Test(expected = TestException.class) + public void testReadWithAsyncRemoteOperatonFailure() throws Throwable { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + doThrow(new TestException()).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); - - Assert.assertFalse(read.get().isPresent()); - + try { + transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + // Expected - throw cause - expects TestException. + throw e.getCause(); + } } + private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker) + throws Throwable { - @Test - public void testReadWhenATimeoutExceptionIsThrown() throws Exception { - final ActorContext actorContext = mock(ActorContext.class); + doThrow(exToThrow).when(mockActorContext).executeShardOperation( + anyString(), any(), anyDuration()); - when(actorContext.executeShardOperation(anyString(), any(), any( - FiniteDuration.class))).thenThrow(new TimeoutException("test", new Exception("reason"))); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + try { + invoker.invoke(transactionProxy); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + // Expected - throw cause - expects TestException. + throw e.getCause(); + } + } + private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable { + testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() { + @Override + public void invoke(TransactionProxy proxy) throws Exception { + proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } + }); + } - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); + @Test(expected = PrimaryNotFoundException.class) + public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test")); + } - Assert.assertFalse(read.get().isPresent()); + @Test(expected = TimeoutException.class) + public void testReadWhenATimeoutExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test", + new Exception("reason"))); + } + @Test(expected = TestException.class) + public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new TestException()); } @Test - public void testReadWhenAAnyOtherExceptionIsThrown() throws Exception { - final ActorContext actorContext = mock(ActorContext.class); - - when(actorContext.executeShardOperation(anyString(), any(), any( - FiniteDuration.class))).thenThrow(new NullPointerException()); - - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + public void testExists() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - try { - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); - fail("A null pointer exception was expected"); - } catch(NullPointerException e){ + doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); - } - } + Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + assertEquals("Exists response", false, exists); + doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); - @Test - public void testWrite() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + assertEquals("Exists response", true, exists); + } - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + @Test(expected = PrimaryNotFoundException.class) + public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { + testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() { + @Override + public void invoke(TransactionProxy proxy) throws Exception { + proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } + }); + } - transactionProxy.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + @Test(expected = ReadFailedException.class) + public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); - Assert.assertNotNull(messages); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - Assert.assertTrue(messages instanceof List); + transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } - List listMessages = (List) messages; + @Test(expected = TestException.class) + public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - Assert.assertEquals(1, listMessages.size()); + doThrow(new TestException()).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); - Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass()); - } + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - private Object createPrimaryFound(ActorRef actorRef) { - return new PrimaryFound(actorRef.path().toString()).toSerializable(); + try { + transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + // Expected - throw cause - expects TestException. + throw e.getCause(); + } } @Test - public void testMerge() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testWrite() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY, schemaContext); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - transactionProxy.merge(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + } - Assert.assertNotNull(messages); + @Test + public void testMerge() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - Assert.assertTrue(messages instanceof List); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY, schemaContext); - List listMessages = (List) messages; + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - Assert.assertEquals(1, listMessages.size()); + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); - Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass()); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); } @Test public void testDelete() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); - - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY, schemaContext); transactionProxy.delete(TestModel.TEST_PATH); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); - - Assert.assertNotNull(messages); - - Assert.assertTrue(messages instanceof List); - - List listMessages = (List) messages; - - Assert.assertEquals(1, listMessages.size()); - - Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass()); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDeleteData()); } + @SuppressWarnings("unchecked") @Test public void testReady() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef doNothingActorRef = getSystem().actorOf(props); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef)); - actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable()); + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE, schemaContext); transactionProxy.read(TestModel.TEST_PATH); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0); - + assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths()); } @Test - public void testGetIdentifier(){ - final Props props = Props.create(DoNothingActor.class); - final ActorRef doNothingActorRef = getSystem().actorOf(props); - - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) ); - - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); - - Assert.assertNotNull(transactionProxy.getIdentifier()); + public void testGetIdentifier() { + setupActorContextWithInitialCreateTransaction(READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + TransactionProxy.TransactionType.READ_ONLY, schemaContext); + + Object id = transactionProxy.getIdentifier(); + assertNotNull("getIdentifier returned null", id); + assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName)); } + @SuppressWarnings("unchecked") @Test - public void testClose(){ - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testClose() throws Exception{ + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE, schemaContext); transactionProxy.read(TestModel.TEST_PATH); transactionProxy.close(); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); - - Assert.assertNotNull(messages); - - Assert.assertTrue(messages instanceof List); - - List listMessages = (List) messages; - - Assert.assertEquals(1, listMessages.size()); - - Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)); - } - - private CreateTransactionReply createTransactionReply(ActorRef actorRef){ - return CreateTransactionReply.newBuilder() - .setTransactionActorPath(actorRef.path().toString()) - .setTransactionId("txn-1") - .build(); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 5874eccda4..fda9ccdfdb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -1,11 +1,14 @@ package org.opendaylight.controller.cluster.datastore.utils; +import java.util.concurrent.TimeUnit; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActor; import akka.japi.Creator; import akka.testkit.JavaTestKit; + import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -14,6 +17,9 @@ import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; @@ -74,14 +80,23 @@ public class ActorContextTest extends AbstractActorTest{ } private static Props props(final boolean found, final ActorRef actorRef){ - return Props.create(new Creator() { + return Props.create(new MockShardManagerCreator(found, actorRef) ); + } - @Override public MockShardManager create() - throws Exception { - return new MockShardManager(found, - actorRef); - } - }); + @SuppressWarnings("serial") + private static class MockShardManagerCreator implements Creator { + final boolean found; + final ActorRef actorRef; + + MockShardManagerCreator(boolean found, ActorRef actorRef) { + this.found = found; + this.actorRef = actorRef; + } + + @Override + public MockShardManager create() throws Exception { + return new MockShardManager(found, actorRef); + } } } @@ -90,6 +105,7 @@ public class ActorContextTest extends AbstractActorTest{ new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); @@ -118,6 +134,7 @@ public class ActorContextTest extends AbstractActorTest{ new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { ActorRef shardManagerActorRef = getSystem() @@ -145,6 +162,7 @@ public class ActorContextTest extends AbstractActorTest{ new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); @@ -173,6 +191,7 @@ public class ActorContextTest extends AbstractActorTest{ new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { ActorRef shardManagerActorRef = getSystem() @@ -193,4 +212,68 @@ public class ActorContextTest extends AbstractActorTest{ }}; } + + @Test + public void testExecuteRemoteOperation() { + new JavaTestKit(getSystem()) {{ + + new Within(duration("3 seconds")) { + @Override + protected void run() { + + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(true, shardActorRef)); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), + mock(Configuration.class)); + + ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); + + Object out = actorContext.executeRemoteOperation(actor, "hello", duration("3 seconds")); + + assertEquals("hello", out); + + expectNoMsg(); + } + }; + }}; + } + + @Test + public void testExecuteRemoteOperationAsync() { + new JavaTestKit(getSystem()) {{ + + new Within(duration("3 seconds")) { + @Override + protected void run() { + + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(true, shardActorRef)); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), + mock(Configuration.class)); + + ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); + + Future future = actorContext.executeRemoteOperationAsync(actor, "hello", + Duration.create(3, TimeUnit.SECONDS)); + + try { + Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS)); + assertEquals("Result", "hello", result); + } catch(Exception e) { + throw new AssertionError(e); + } + + expectNoMsg(); + } + }; + }}; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java index 5d3853f311..b19fd3a529 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java @@ -8,7 +8,7 @@ package org.opendaylight.controller.cluster.datastore.utils; - +import static org.junit.Assert.assertNotNull; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; @@ -16,10 +16,12 @@ import scala.concurrent.duration.FiniteDuration; public class MockActorContext extends ActorContext { - private Object executeShardOperationResponse; - private Object executeRemoteOperationResponse; - private Object executeLocalOperationResponse; - private Object executeLocalShardOperationResponse; + private volatile Object executeShardOperationResponse; + private volatile Object executeRemoteOperationResponse; + private volatile Object executeLocalOperationResponse; + private volatile Object executeLocalShardOperationResponse; + private volatile Exception executeRemoteOperationFailure; + private volatile Object inputMessage; public MockActorContext(ActorSystem actorSystem) { super(actorSystem, null, new MockClusterWrapper(), new MockConfiguration()); @@ -52,6 +54,10 @@ public class MockActorContext extends ActorContext { executeRemoteOperationResponse = response; } + public void setExecuteRemoteOperationFailure(Exception executeRemoteOperationFailure) { + this.executeRemoteOperationFailure = executeRemoteOperationFailure; + } + public void setExecuteLocalOperationResponse( Object executeLocalOperationResponse) { this.executeLocalOperationResponse = executeLocalOperationResponse; @@ -62,12 +68,20 @@ public class MockActorContext extends ActorContext { this.executeLocalShardOperationResponse = executeLocalShardOperationResponse; } - @Override public Object executeLocalOperation(ActorRef actor, + @SuppressWarnings("unchecked") + public T getInputMessage(Class expType) throws Exception { + assertNotNull("Input message was null", inputMessage); + return (T) expType.getMethod("fromSerializable", Object.class).invoke(null, inputMessage); + } + + @Override + public Object executeLocalOperation(ActorRef actor, Object message, FiniteDuration duration) { return this.executeLocalOperationResponse; } - @Override public Object executeLocalShardOperation(String shardName, + @Override + public Object executeLocalShardOperation(String shardName, Object message, FiniteDuration duration) { return this.executeLocalShardOperationResponse; } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java index 948f3c8d8b..8664e8910b 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java @@ -16,8 +16,6 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFac import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService; import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; -import org.opendaylight.yangtools.util.PropertyUtils; - import com.google.common.collect.ImmutableMap; /** @@ -26,17 +24,6 @@ import com.google.common.collect.ImmutableMap; public final class DomInmemoryDataBrokerModule extends org.opendaylight.controller.config.yang.md.sal.dom.impl.AbstractDomInmemoryDataBrokerModule { - private static final String FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP = - "mdsal.datastore-future-callback-queue.size"; - private static final int DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE = 1000; - - private static final String FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP = - "mdsal.datastore-future-callback-pool.size"; - private static final int DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE = 20; - private static final String COMMIT_EXECUTOR_MAX_QUEUE_SIZE_PROP = - "mdsal.datastore-commit-queue.size"; - private static final int DEFAULT_COMMIT_EXECUTOR_MAX_QUEUE_SIZE = 5000; - public DomInmemoryDataBrokerModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { super(identifier, dependencyResolver); @@ -81,9 +68,7 @@ public final class DomInmemoryDataBrokerModule extends * system it's running on. */ ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor( - PropertyUtils.getIntSystemProperty( - COMMIT_EXECUTOR_MAX_QUEUE_SIZE_PROP, - DEFAULT_COMMIT_EXECUTOR_MAX_QUEUE_SIZE), "WriteTxCommit"); + getMaxDataBrokerCommitQueueSize(), "WriteTxCommit"); /* * We use an executor for commit ListenableFuture callbacks that favors reusing available @@ -94,12 +79,8 @@ public final class DomInmemoryDataBrokerModule extends * reached, subsequent submitted tasks will block the caller. */ Executor listenableFutureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool( - PropertyUtils.getIntSystemProperty( - FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP, - DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE), - PropertyUtils.getIntSystemProperty( - FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP, - DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE), "CommitFutures"); + getMaxDataBrokerFutureCallbackPoolSize(), getMaxDataBrokerFutureCallbackQueueSize(), + "CommitFutures"); DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores, new DeadlockDetectingListeningExecutorService(commitExecutor, diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/yang/opendaylight-dom-broker-impl.yang b/opendaylight/md-sal/sal-dom-broker/src/main/yang/opendaylight-dom-broker-impl.yang index a0ee5c50c9..b1df7efcdb 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/yang/opendaylight-dom-broker-impl.yang +++ b/opendaylight/md-sal/sal-dom-broker/src/main/yang/opendaylight-dom-broker-impl.yang @@ -108,6 +108,24 @@ module opendaylight-sal-dom-broker-impl { } } } + + leaf max-data-broker-future-callback-queue-size { + default 1000; + type uint16; + description "The maximum queue size for the data broker's commit future callback executor."; + } + + leaf max-data-broker-future-callback-pool-size { + default 20; + type uint16; + description "The maximum thread pool size for the data broker's commit future callback executor."; + } + + leaf max-data-broker-commit-queue-size { + default 5000; + type uint16; + description "The maximum queue size for the data broker's commit executor."; + } } } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryConfigDataStoreProviderModule.java index 39a448ff6c..fd1627c6f9 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryConfigDataStoreProviderModule.java @@ -1,5 +1,6 @@ package org.opendaylight.controller.config.yang.inmemory_datastore_provider; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryConfigDataStoreProviderModule { @@ -19,7 +20,9 @@ public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.cont @Override public java.lang.AutoCloseable createInstance() { - return InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency()); + return InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency(), + InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(), + getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize())); } } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryOperationalDataStoreProviderModule.java index 615fe0211c..7026b03022 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryOperationalDataStoreProviderModule.java @@ -1,5 +1,6 @@ package org.opendaylight.controller.config.yang.inmemory_datastore_provider; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryOperationalDataStoreProviderModule { @@ -19,7 +20,9 @@ public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight @Override public java.lang.AutoCloseable createInstance() { - return InMemoryDOMDataStoreFactory.create("DOM-OPER", getOperationalSchemaServiceDependency()); + return InMemoryDOMDataStoreFactory.create("DOM-OPER", getOperationalSchemaServiceDependency(), + InMemoryDOMDataStoreConfigProperties.create(getMaxDataChangeExecutorPoolSize(), + getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize())); } } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java index b61b367103..d0d3fe9e6a 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java @@ -21,7 +21,6 @@ import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedEx import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype; import org.opendaylight.yangtools.util.ExecutorServiceUtil; -import org.opendaylight.yangtools.util.PropertyUtils; import org.opendaylight.yangtools.util.concurrent.NotificationManager; import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; @@ -85,11 +84,6 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch } }; - private static final String DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP = - "mdsal.datastore-dcl-notification-queue.size"; - - private static final int DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE = 1000; - private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(); private final ListenerTree listenerTree = ListenerTree.create(); private final AtomicLong txCounter = new AtomicLong(0); @@ -104,17 +98,21 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor, final ExecutorService dataChangeListenerExecutor) { + this(name, listeningExecutor, dataChangeListenerExecutor, + InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE); + } + + public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor, + final ExecutorService dataChangeListenerExecutor, int maxDataChangeListenerQueueSize) { this.name = Preconditions.checkNotNull(name); this.listeningExecutor = Preconditions.checkNotNull(listeningExecutor); this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor); - int maxDCLQueueSize = PropertyUtils.getIntSystemProperty( - DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE ); - dataChangeListenerNotificationManager = new QueuedNotificationManager<>(this.dataChangeListenerExecutor, - DCL_NOTIFICATION_MGR_INVOKER, maxDCLQueueSize, "DataChangeListenerQueueMgr"); + DCL_NOTIFICATION_MGR_INVOKER, maxDataChangeListenerQueueSize, + "DataChangeListenerQueueMgr"); } @Override diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreConfigProperties.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreConfigProperties.java new file mode 100644 index 0000000000..6e451ba12b --- /dev/null +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreConfigProperties.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.md.sal.dom.store.impl; + +/** + * Holds configuration properties when creating an {@link InMemoryDOMDataStore} instance via the + * {@link InMemoryDOMDataStoreFactory} + * + * @author Thomas Pantelis + * @see InMemoryDOMDataStoreFactory + */ +public class InMemoryDOMDataStoreConfigProperties { + + public static final int DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE = 1000; + public static final int DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE = 20; + public static final int DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE = 1000; + + private static final InMemoryDOMDataStoreConfigProperties DEFAULT = + create(DEFAULT_MAX_DATA_CHANGE_EXECUTOR_POOL_SIZE, + DEFAULT_MAX_DATA_CHANGE_EXECUTOR_QUEUE_SIZE, + DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE); + + private final int maxDataChangeExecutorQueueSize; + private final int maxDataChangeExecutorPoolSize; + private final int maxDataChangeListenerQueueSize; + + /** + * Constructs an instance with the given property values. + * + * @param maxDataChangeExecutorPoolSize + * maximum thread pool size for the data change notification executor. + * @param maxDataChangeExecutorQueueSize + * maximum queue size for the data change notification executor. + * @param maxDataChangeListenerQueueSize + * maximum queue size for the data change listeners. + */ + public static InMemoryDOMDataStoreConfigProperties create(int maxDataChangeExecutorPoolSize, + int maxDataChangeExecutorQueueSize, int maxDataChangeListenerQueueSize) { + return new InMemoryDOMDataStoreConfigProperties(maxDataChangeExecutorPoolSize, + maxDataChangeExecutorQueueSize, maxDataChangeListenerQueueSize); + } + + /** + * Returns the InMemoryDOMDataStoreConfigProperties instance with default values. + */ + public static InMemoryDOMDataStoreConfigProperties getDefault() { + return DEFAULT; + } + + private InMemoryDOMDataStoreConfigProperties(int maxDataChangeExecutorPoolSize, + int maxDataChangeExecutorQueueSize, int maxDataChangeListenerQueueSize) { + this.maxDataChangeExecutorQueueSize = maxDataChangeExecutorQueueSize; + this.maxDataChangeExecutorPoolSize = maxDataChangeExecutorPoolSize; + this.maxDataChangeListenerQueueSize = maxDataChangeListenerQueueSize; + } + + /** + * Returns the maximum queue size for the data change notification executor. + */ + public int getMaxDataChangeExecutorQueueSize() { + return maxDataChangeExecutorQueueSize; + } + + /** + * Returns the maximum thread pool size for the data change notification executor. + */ + public int getMaxDataChangeExecutorPoolSize() { + return maxDataChangeExecutorPoolSize; + } + + /** + * Returns the maximum queue size for the data change listeners. + */ + public int getMaxDataChangeListenerQueueSize() { + return maxDataChangeListenerQueueSize; + } +} diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java index c853a132de..a3512743ed 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java @@ -15,7 +15,6 @@ import javax.annotation.Nullable; import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; -import org.opendaylight.yangtools.util.PropertyUtils; import com.google.common.util.concurrent.MoreExecutors; /** @@ -25,43 +24,46 @@ import com.google.common.util.concurrent.MoreExecutors; */ public final class InMemoryDOMDataStoreFactory { - private static final String DCL_EXECUTOR_MAX_QUEUE_SIZE_PROP = - "mdsal.datastore-dcl-notification-queue.size"; - private static final int DEFAULT_DCL_EXECUTOR_MAX_QUEUE_SIZE = 1000; - - private static final String DCL_EXECUTOR_MAX_POOL_SIZE_PROP = - "mdsal.datastore-dcl-notification-pool.size"; - private static final int DEFAULT_DCL_EXECUTOR_MAX_POOL_SIZE = 20; - private InMemoryDOMDataStoreFactory() { } + public static InMemoryDOMDataStore create(final String name, + @Nullable final SchemaService schemaService) { + return create(name, schemaService, null); + } + /** * Creates an InMemoryDOMDataStore instance. * * @param name the name of the data store * @param schemaService the SchemaService to which to register the data store. + * @param properties configuration properties for the InMemoryDOMDataStore instance. If null, + * default property values are used. * @return an InMemoryDOMDataStore instance */ public static InMemoryDOMDataStore create(final String name, - @Nullable final SchemaService schemaService) { + @Nullable final SchemaService schemaService, + @Nullable final InMemoryDOMDataStoreConfigProperties properties) { + + InMemoryDOMDataStoreConfigProperties actualProperties = properties; + if(actualProperties == null) { + actualProperties = InMemoryDOMDataStoreConfigProperties.getDefault(); + } // For DataChangeListener notifications we use an executor that provides the fastest // task execution time to get higher throughput as DataChangeListeners typically provide // much of the business logic for a data model. If the executor queue size limit is reached, // subsequent submitted notifications will block the calling thread. - int dclExecutorMaxQueueSize = PropertyUtils.getIntSystemProperty( - DCL_EXECUTOR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_EXECUTOR_MAX_QUEUE_SIZE); - int dclExecutorMaxPoolSize = PropertyUtils.getIntSystemProperty( - DCL_EXECUTOR_MAX_POOL_SIZE_PROP, DEFAULT_DCL_EXECUTOR_MAX_POOL_SIZE); + int dclExecutorMaxQueueSize = actualProperties.getMaxDataChangeExecutorQueueSize(); + int dclExecutorMaxPoolSize = actualProperties.getMaxDataChangeExecutorPoolSize(); ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool( dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" ); InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name, MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()), - dataChangeListenerExecutor); + dataChangeListenerExecutor, actualProperties.getMaxDataChangeListenerQueueSize()); if(schemaService != null) { schemaService.registerSchemaContextListener(dataStore); diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/yang/opendaylight-inmemory-datastore-provider.yang b/opendaylight/md-sal/sal-inmemory-datastore/src/main/yang/opendaylight-inmemory-datastore-provider.yang index d4f57b53fe..1292d3772a 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/yang/opendaylight-inmemory-datastore-provider.yang +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/yang/opendaylight-inmemory-datastore-provider.yang @@ -41,34 +41,68 @@ module opendaylight-inmemory-datastore-provider { when "/config:modules/config:module/config:type = 'inmemory-config-datastore-provider'"; container schema-service { - uses config:service-ref { + uses config:service-ref { refine type { mandatory false; config:required-identity sal:schema-service; } - } + } + } + + leaf max-data-change-executor-queue-size { + default 1000; + type uint16; + description "The maximum queue size for the data change notification executor."; + } + + leaf max-data-change-executor-pool-size { + default 20; + type uint16; + description "The maximum thread pool size for the data change notification executor."; + } + + leaf max-data-change-listener-queue-size { + default 1000; + type uint16; + description "The maximum queue size for the data change listeners."; } } } + // Augments the 'configuration' choice node under modules/module. + augment "/config:modules/config:module/config:configuration" { + case inmemory-operational-datastore-provider { + when "/config:modules/config:module/config:type = 'inmemory-operational-datastore-provider'"; + // Yang does not allow two cases from same namespaces with same children + // Schema-service dependency renamed to operational-schema-service + // to prevent conflict with schema-service container from inmemory-config-datastore-provider + container operational-schema-service { + uses config:service-ref { + refine type { + mandatory false; + config:required-identity sal:schema-service; + } + } + } + + leaf max-data-change-executor-queue-size { + default 1000; + type uint16; + description "The maximum queue size for the data change notification executor."; + } - // Augments the 'configuration' choice node under modules/module. - augment "/config:modules/config:module/config:configuration" { - case inmemory-operational-datastore-provider { - when "/config:modules/config:module/config:type = 'inmemory-operational-datastore-provider'"; + leaf max-data-change-executor-pool-size { + default 20; + type uint16; + description "The maximum thread pool size for the data change notification executor."; + } - // Yang does not allow two cases from same namespaces with same children - // Schema-service dependency renamed to operational-schema-service - // to prevent conflict with schema-service container from inmemory-config-datastore-provider - container operational-schema-service { - uses config:service-ref { - refine type { - mandatory false; - config:required-identity sal:schema-service; - } - } - } + leaf max-data-change-listener-queue-size { + default 1000; + type uint16; + description "The maximum queue size for the data change listeners."; } } + } } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java index 77e342641e..d6bfc0c3b6 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java @@ -115,18 +115,21 @@ public final class NetconfStateSchemas { final CompositeNode schemasNode = (CompositeNode) NetconfMessageTransformUtil.findNode(schemasNodeResult.getResult(), DATA_STATE_SCHEMAS_IDENTIFIER); - return create(schemasNode); + return create(id, schemasNode); } /** * Parse response of get(netconf-state/schemas) to find all schemas under netconf-state/schemas */ @VisibleForTesting - protected static NetconfStateSchemas create(final CompositeNode schemasNode) { + protected static NetconfStateSchemas create(final RemoteDeviceId id, final CompositeNode schemasNode) { final Set availableYangSchemas = Sets.newHashSet(); for (final CompositeNode schemaNode : schemasNode.getCompositesByName(Schema.QNAME.withoutRevision())) { - availableYangSchemas.add(RemoteYangSchema.createFromCompositeNode(schemaNode)); + final Optional fromCompositeNode = RemoteYangSchema.createFromCompositeNode(id, schemaNode); + if(fromCompositeNode.isPresent()) { + availableYangSchemas.add(fromCompositeNode.get()); + } } return new NetconfStateSchemas(availableYangSchemas); @@ -143,19 +146,23 @@ public final class NetconfStateSchemas { return qname; } - static RemoteYangSchema createFromCompositeNode(final CompositeNode schemaNode) { + static Optional createFromCompositeNode(final RemoteDeviceId id, final CompositeNode schemaNode) { Preconditions.checkArgument(schemaNode.getKey().equals(Schema.QNAME.withoutRevision()), "Wrong QName %s", schemaNode.getKey()); QName childNode = NetconfMessageTransformUtil.IETF_NETCONF_MONITORING_SCHEMA_FORMAT.withoutRevision(); final String formatAsString = getSingleChildNodeValue(schemaNode, childNode).get(); - Preconditions.checkArgument(formatAsString.equals(Yang.QNAME.getLocalName()), - "Expecting format to be only %s, not %s", Yang.QNAME.getLocalName(), formatAsString); + if(formatAsString.equals(Yang.QNAME.getLocalName()) == false) { + logger.debug("{}: Ignoring schema due to unsupported format: {}", id, formatAsString); + return Optional.absent(); + } childNode = NetconfMessageTransformUtil.IETF_NETCONF_MONITORING_SCHEMA_LOCATION.withoutRevision(); final Set locationsAsString = getAllChildNodeValues(schemaNode, childNode); - Preconditions.checkArgument(locationsAsString.contains(Schema.Location.Enumeration.NETCONF.toString()), - "Expecting location to be %s, not %s", Schema.Location.Enumeration.NETCONF.toString(), locationsAsString); + if(locationsAsString.contains(Schema.Location.Enumeration.NETCONF.toString()) == false) { + logger.debug("{}: Ignoring schema due to unsupported location: {}", id, locationsAsString); + return Optional.absent(); + } childNode = NetconfMessageTransformUtil.IETF_NETCONF_MONITORING_SCHEMA_NAMESPACE.withoutRevision(); final String namespaceAsString = getSingleChildNodeValue(schemaNode, childNode).get(); @@ -171,7 +178,7 @@ public final class NetconfStateSchemas { ? QName.create(namespaceAsString, revisionAsString.get(), moduleNameAsString) : QName.create(URI.create(namespaceAsString), null, moduleNameAsString).withoutRevision(); - return new RemoteYangSchema(moduleQName); + return Optional.of(new RemoteYangSchema(moduleQName)); } private static Set getAllChildNodeValues(final CompositeNode schemaNode, final QName childNodeQName) { diff --git a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemasTest.java b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemasTest.java index 16a915e730..3f9c8caa0e 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemasTest.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemasTest.java @@ -7,6 +7,7 @@ import static org.junit.matchers.JUnitMatchers.hasItem; import java.util.Set; import org.junit.Test; import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils; @@ -18,7 +19,7 @@ public class NetconfStateSchemasTest { public void testCreate() throws Exception { final Document schemasXml = XmlUtil.readXmlToDocument(getClass().getResourceAsStream("/netconf-state.schemas.payload.xml")); final CompositeNode compositeNodeSchemas = (CompositeNode) XmlDocumentUtils.toDomNode(schemasXml); - final NetconfStateSchemas schemas = NetconfStateSchemas.create(compositeNodeSchemas); + final NetconfStateSchemas schemas = NetconfStateSchemas.create(new RemoteDeviceId("device"), compositeNodeSchemas); final Set availableYangSchemasQNames = schemas.getAvailableYangSchemasQNames(); assertEquals(73, availableYangSchemasQNames.size()); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml index 38ec5f5ac2..4e007f4c5d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml +++ b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml @@ -59,18 +59,23 @@ org.opendaylight.controller sal-core-api - - org.opendaylight.controller - netconf-util - - org.opendaylight.controller - sal-core-spi + org.opendaylight.controller + sal-core-spi + + + org.opendaylight.controller + sal-common-impl - org.opendaylight.controller - sal-common-impl + org.opendaylight.controller + netconf-util + + org.opendaylight.controller + sal-clustering-commons + + @@ -112,6 +117,11 @@ scala-library + + com.codahale.metrics + metrics-core + 3.0.1 + junit @@ -156,8 +166,11 @@ ${project.groupId}.${project.artifactId} - !org.jboss.*;!com.jcraft.*;* + !org.iq80.*;!*snappy;!org.jboss.*;!com.jcraft.*;!org.fusesource.*;* + sal-clustering-commons; + sal-akka-raft; + *metrics*; !sal*; !*config-api*; !*testkit*; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java index 02e2d12015..4496bd3263 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java @@ -7,7 +7,7 @@ import org.opendaylight.controller.remote.rpc.messages.ErrorResponse; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; import org.opendaylight.controller.remote.rpc.utils.ActorUtil; -import org.opendaylight.controller.remote.rpc.utils.XmlUtils; +import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation; import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.yangtools.yang.common.QName; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java index 611618f1f6..4ec96c29cd 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java @@ -20,7 +20,7 @@ import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; import org.opendaylight.controller.remote.rpc.utils.ActorUtil; import org.opendaylight.controller.remote.rpc.utils.RoutingLogic; -import org.opendaylight.controller.remote.rpc.utils.XmlUtils; +import org.opendaylight.controller.xml.codec.XmlUtils; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.yangtools.yang.common.RpcResult; diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.java index 8dbc5b50ee..f11e25c046 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.sal.restconf.impl; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; @@ -41,7 +40,6 @@ import javax.ws.rs.core.Response.Status; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION; @@ -178,39 +176,31 @@ public class BrokerFacade { private NormalizedNode readDataViaTransaction(final DOMDataReadTransaction transaction, LogicalDatastoreType datastore, YangInstanceIdentifier path) { LOG.trace("Read " + datastore.name() + " via Restconf: {}", path); - final ListenableFuture>> listenableFuture = transaction.read(datastore, path); - if (listenableFuture != null) { - Optional> optional; - try { - LOG.debug("Reading result data from transaction."); - optional = listenableFuture.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RestconfDocumentedException("Problem to get data from transaction.", e.getCause()); + final CheckedFuture>, ReadFailedException> listenableFuture = + transaction.read(datastore, path); - } - if (optional != null) { - if (optional.isPresent()) { - return optional.get(); - } - } + try { + Optional> optional = listenableFuture.checkedGet(); + return optional.isPresent() ? optional.get() : null; + } catch(ReadFailedException e) { + throw new RestconfDocumentedException(e.getMessage(), e, e.getErrorList()); } - return null; } private CheckedFuture postDataViaTransaction( final DOMDataReadWriteTransaction rWTransaction, final LogicalDatastoreType datastore, final YangInstanceIdentifier path, final NormalizedNode payload, DataNormalizationOperation root) { - ListenableFuture>> futureDatastoreData = rWTransaction.read(datastore, path); + CheckedFuture>, ReadFailedException> futureDatastoreData = + rWTransaction.read(datastore, path); try { - final Optional> optionalDatastoreData = futureDatastoreData.get(); + final Optional> optionalDatastoreData = futureDatastoreData.checkedGet(); if (optionalDatastoreData.isPresent() && payload.equals(optionalDatastoreData.get())) { - String errMsg = "Post Configuration via Restconf was not executed because data already exists"; - LOG.trace(errMsg + ":{}", path); + LOG.trace("Post Configuration via Restconf was not executed because data already exists :{}", path); throw new RestconfDocumentedException("Data already exists for path: " + path, ErrorType.PROTOCOL, ErrorTag.DATA_EXISTS); } - } catch (InterruptedException | ExecutionException e) { - LOG.trace("It wasn't possible to get data loaded from datastore at path " + path); + } catch(ReadFailedException e) { + LOG.warn("Error reading from datastore with path: " + path, e); } ensureParentsByMerge(datastore, path, rWTransaction, root); @@ -251,27 +241,21 @@ public class BrokerFacade { try { currentOp = currentOp.getChild(currentArg); } catch (DataNormalizationException e) { - throw new IllegalArgumentException( - String.format("Invalid child encountered in path %s", normalizedPath), e); + throw new RestconfDocumentedException( + String.format("Error normalizing data for path %s", normalizedPath), e); } currentArguments.add(currentArg); YangInstanceIdentifier currentPath = YangInstanceIdentifier.create(currentArguments); - final Boolean exists; - try { - CheckedFuture future = - rwTx.exists(store, currentPath); - exists = future.checkedGet(); + boolean exists = rwTx.exists(store, currentPath).checkedGet(); + if (!exists && iterator.hasNext()) { + rwTx.merge(store, currentPath, currentOp.createDefault(currentArg)); + } } catch (ReadFailedException e) { LOG.error("Failed to read pre-existing data from store {} path {}", store, currentPath, e); - throw new IllegalStateException("Failed to read pre-existing data", e); - } - - - if (!exists && iterator.hasNext()) { - rwTx.merge(store, currentPath, currentOp.createDefault(currentArg)); + throw new RestconfDocumentedException("Failed to read pre-existing data", e); } } } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfDocumentedException.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfDocumentedException.java index e3e0c3a2bd..2ceef3203c 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfDocumentedException.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfDocumentedException.java @@ -10,11 +10,17 @@ package org.opendaylight.controller.sal.restconf.impl; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import java.util.Collection; import java.util.List; + import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response.Status; + import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorTag; import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorType; +import org.opendaylight.yangtools.yang.common.RpcError; /** * Unchecked exception to communicate error information, as defined in the ietf restcong draft, to be sent to the @@ -57,8 +63,8 @@ public class RestconfDocumentedException extends WebApplicationException { } /** - * Constructs an instance with an error message and exception cause. The stack trace of the exception is included in - * the error info. + * Constructs an instance with an error message and exception cause. + * The stack trace of the exception is included in the error info. * * @param message * A string which provides a plain text string describing the error. @@ -80,12 +86,25 @@ public class RestconfDocumentedException extends WebApplicationException { /** * Constructs an instance with the given errors. */ - public RestconfDocumentedException(List errors) { - this.errors = ImmutableList.copyOf(errors); - Preconditions.checkArgument(!this.errors.isEmpty(), "RestconfError list can't be empty"); + public RestconfDocumentedException(String message, Throwable cause, List errors) { + super(message, cause); + if(!errors.isEmpty()) { + this.errors = ImmutableList.copyOf(errors); + } else { + this.errors = ImmutableList.of(new RestconfError(RestconfError.ErrorType.APPLICATION, + RestconfError.ErrorTag.OPERATION_FAILED, message)); + } + status = null; } + /** + * Constructs an instance with the given RpcErrors. + */ + public RestconfDocumentedException(String message, Throwable cause, Collection rpcErrors) { + this(message, cause, convertToRestconfErrors(rpcErrors)); + } + /** * Constructs an instance with an HTTP status and no error information. * @@ -105,6 +124,18 @@ public class RestconfDocumentedException extends WebApplicationException { status = null; } + private static List convertToRestconfErrors(Collection rpcErrors) { + List errorList = Lists.newArrayList(); + if(rpcErrors != null) { + for (RpcError rpcError : rpcErrors) { + errorList.add(new RestconfError(rpcError)); + } + } + + return errorList; + } + + public List getErrors() { return errors; } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java index fac6c80564..b94f6a6166 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java @@ -21,7 +21,6 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -37,6 +36,8 @@ import javax.ws.rs.core.UriInfo; import org.apache.commons.lang3.StringUtils; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.sal.rest.api.Draft02; @@ -52,7 +53,6 @@ import org.opendaylight.controller.sal.streams.websockets.WebSocketServer; import org.opendaylight.yangtools.concepts.Codec; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.QNameModule; -import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.MutableCompositeNode; @@ -609,19 +609,8 @@ public class RestconfImpl implements RestconfService { private void checkRpcSuccessAndThrowException(final RpcResult rpcResult) { if (rpcResult.isSuccessful() == false) { - Collection rpcErrors = rpcResult.getErrors(); - if (rpcErrors == null || rpcErrors.isEmpty()) { - throw new RestconfDocumentedException( - "The operation was not successful and there were no RPC errors returned", ErrorType.RPC, - ErrorTag.OPERATION_FAILED); - } - - List errorList = Lists.newArrayList(); - for (RpcError rpcError : rpcErrors) { - errorList.add(new RestconfError(rpcError)); - } - - throw new RestconfDocumentedException(errorList); + throw new RestconfDocumentedException("The operation was not successful", null, + rpcResult.getErrors()); } } @@ -729,18 +718,50 @@ public class RestconfImpl implements RestconfService { iiWithData.getSchemaNode()); YangInstanceIdentifier normalizedII; + if (mountPoint != null) { + normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized( + iiWithData.getInstanceIdentifier()); + } else { + normalizedII = controllerContext.toNormalized(iiWithData.getInstanceIdentifier()); + } - try { - if (mountPoint != null) { - normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(iiWithData - .getInstanceIdentifier()); - broker.commitConfigurationDataPut(mountPoint, normalizedII, datastoreNormalizedNode).get(); - } else { - normalizedII = controllerContext.toNormalized(iiWithData.getInstanceIdentifier()); - broker.commitConfigurationDataPut(normalizedII, datastoreNormalizedNode).get(); + /* + * There is a small window where another write transaction could be updating the same data + * simultaneously and we get an OptimisticLockFailedException. This error is likely + * transient and The WriteTransaction#submit API docs state that a retry will likely + * succeed. So we'll try again if that scenario occurs. If it fails a third time then it + * probably will never succeed so we'll fail in that case. + * + * By retrying we're attempting to hide the internal implementation of the data store and + * how it handles concurrent updates from the restconf client. The client has instructed us + * to put the data and we should make every effort to do so without pushing optimistic lock + * failures back to the client and forcing them to handle it via retry (and having to + * document the behavior). + */ + int tries = 2; + while(true) { + try { + if (mountPoint != null) { + broker.commitConfigurationDataPut(mountPoint, normalizedII, + datastoreNormalizedNode).checkedGet(); + } else { + broker.commitConfigurationDataPut(normalizedII, + datastoreNormalizedNode).checkedGet(); + } + + break; + } catch (TransactionCommitFailedException e) { + if(e instanceof OptimisticLockFailedException) { + if(--tries <= 0) { + LOG.debug("Got OptimisticLockFailedException on last try - failing"); + throw new RestconfDocumentedException(e.getMessage(), e, e.getErrorList()); + } + + LOG.debug("Got OptimisticLockFailedException - trying again"); + } else { + throw new RestconfDocumentedException(e.getMessage(), e, e.getErrorList()); + } } - } catch (Exception e) { - throw new RestconfDocumentedException("Error updating data", e); } return Response.status(Status.OK).build(); @@ -852,6 +873,8 @@ public class RestconfImpl implements RestconfService { normalizedII = controllerContext.toNormalized(iiWithData.getInstanceIdentifier()); broker.commitConfigurationDataPost(normalizedII, datastoreNormalizedData); } + } catch(RestconfDocumentedException e) { + throw e; } catch (Exception e) { throw new RestconfDocumentedException("Error creating data", e); } @@ -898,6 +921,8 @@ public class RestconfImpl implements RestconfService { normalizedII = controllerContext.toNormalized(iiWithData.getInstanceIdentifier()); broker.commitConfigurationDataPost(normalizedII, datastoreNormalizedData); } + } catch(RestconfDocumentedException e) { + throw e; } catch (Exception e) { throw new RestconfDocumentedException("Error creating data", e); } @@ -1156,8 +1181,9 @@ public class RestconfImpl implements RestconfService { private CompositeNode normalizeNode(final Node node, final DataSchemaNode schema, final DOMMountPoint mountPoint) { if (schema == null) { - QName nodeType = node == null ? null : node.getNodeType(); - String localName = nodeType == null ? null : nodeType.getLocalName(); + String localName = node == null ? null : + node instanceof NodeWrapper ? ((NodeWrapper)node).getLocalName() : + node.getNodeType().getLocalName(); throw new RestconfDocumentedException("Data schema node was not found for " + localName, ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/InvokeRpcMethodTest.java b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/InvokeRpcMethodTest.java index 500baafab3..2f045ce381 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/InvokeRpcMethodTest.java +++ b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/InvokeRpcMethodTest.java @@ -146,7 +146,7 @@ public class InvokeRpcMethodTest { restconfImpl.invokeRpc("toaster:cancel-toast", "", uriInfo); fail("Expected an exception to be thrown."); } catch (RestconfDocumentedException e) { - verifyRestconfDocumentedException(e, 0, ErrorType.RPC, ErrorTag.OPERATION_FAILED, + verifyRestconfDocumentedException(e, 0, ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED, Optional. absent(), Optional. absent()); } } diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestPutOperationTest.java b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestPutOperationTest.java index 3284546dcb..3591bfb22b 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestPutOperationTest.java +++ b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestPutOperationTest.java @@ -16,19 +16,23 @@ import static org.mockito.Mockito.when; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.URISyntaxException; + import javax.ws.rs.client.Entity; import javax.ws.rs.core.Application; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; + import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.test.JerseyTest; import org.junit.BeforeClass; import org.junit.Test; +import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; @@ -158,6 +162,36 @@ public class RestPutOperationTest extends JerseyTest { assertEquals(200, put(uri, MediaType.APPLICATION_XML, xmlData3)); } + @Test + public void putWithOptimisticLockFailedException() throws UnsupportedEncodingException { + + String uri = "/config/ietf-interfaces:interfaces/interface/eth0"; + + doThrow(OptimisticLockFailedException.class). + when(brokerFacade).commitConfigurationDataPut( + any(YangInstanceIdentifier.class), any(NormalizedNode.class)); + + assertEquals(500, put(uri, MediaType.APPLICATION_XML, xmlData)); + + doThrow(OptimisticLockFailedException.class).doReturn(mock(CheckedFuture.class)). + when(brokerFacade).commitConfigurationDataPut( + any(YangInstanceIdentifier.class), any(NormalizedNode.class)); + + assertEquals(200, put(uri, MediaType.APPLICATION_XML, xmlData)); + } + + @Test + public void putWithTransactionCommitFailedException() throws UnsupportedEncodingException { + + String uri = "/config/ietf-interfaces:interfaces/interface/eth0"; + + doThrow(TransactionCommitFailedException.class). + when(brokerFacade).commitConfigurationDataPut( + any(YangInstanceIdentifier.class), any(NormalizedNode.class)); + + assertEquals(500, put(uri, MediaType.APPLICATION_XML, xmlData)); + } + private int put(String uri, String mediaType, String data) throws UnsupportedEncodingException { return target(uri).request(mediaType).put(Entity.entity(data, mediaType)).getStatus(); } diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfDocumentedExceptionMapperTest.java b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfDocumentedExceptionMapperTest.java index 2747b9e264..4e9c96ac3e 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfDocumentedExceptionMapperTest.java +++ b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfDocumentedExceptionMapperTest.java @@ -412,7 +412,7 @@ public class RestconfDocumentedExceptionMapperTest extends JerseyTest { List errorList = Arrays.asList(new RestconfError(ErrorType.APPLICATION, ErrorTag.LOCK_DENIED, "mock error1"), new RestconfError(ErrorType.RPC, ErrorTag.ROLLBACK_FAILED, "mock error2")); - stageMockEx(new RestconfDocumentedException(errorList)); + stageMockEx(new RestconfDocumentedException("mock", null, errorList)); Response resp = target("/operational/foo").request(MediaType.APPLICATION_JSON).get(); @@ -651,7 +651,7 @@ public class RestconfDocumentedExceptionMapperTest extends JerseyTest { List errorList = Arrays.asList(new RestconfError(ErrorType.APPLICATION, ErrorTag.LOCK_DENIED, "mock error1"), new RestconfError(ErrorType.RPC, ErrorTag.ROLLBACK_FAILED, "mock error2")); - stageMockEx(new RestconfDocumentedException(errorList)); + stageMockEx(new RestconfDocumentedException("mock", null, errorList)); Response resp = target("/operational/foo").request(MediaType.APPLICATION_XML).get(); diff --git a/opendaylight/md-sal/samples/toaster-provider/src/main/java/org/opendaylight/controller/config/yang/config/toaster_provider/impl/ToasterProviderModule.java b/opendaylight/md-sal/samples/toaster-provider/src/main/java/org/opendaylight/controller/config/yang/config/toaster_provider/impl/ToasterProviderModule.java index 388c78eaaf..5832e29a13 100644 --- a/opendaylight/md-sal/samples/toaster-provider/src/main/java/org/opendaylight/controller/config/yang/config/toaster_provider/impl/ToasterProviderModule.java +++ b/opendaylight/md-sal/samples/toaster-provider/src/main/java/org/opendaylight/controller/config/yang/config/toaster_provider/impl/ToasterProviderModule.java @@ -76,9 +76,17 @@ public final class ToasterProviderModule extends dataChangeListenerRegistration.close(); rpcRegistration.close(); runtimeReg.close(); - opendaylightToaster.close(); + closeQuietly(opendaylightToaster); log.info("Toaster provider (instance {}) torn down.", this); } + + private void closeQuietly(final AutoCloseable resource) { + try { + resource.close(); + } catch (final Exception e) { + log.debug("Ignoring exception while closing {}", resource, e); + } + } } AutoCloseable ret = new AutoCloseableToaster(); diff --git a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java index 2f7bd20d61..6826b4a09c 100644 --- a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java +++ b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java @@ -49,6 +49,7 @@ import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFact import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler; import org.opendaylight.controller.netconf.ssh.NetconfSSHServer; import org.opendaylight.controller.netconf.ssh.authentication.AuthProvider; +import org.opendaylight.controller.netconf.ssh.authentication.AuthProviderImpl; import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator; import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil; import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil; @@ -136,7 +137,7 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest { } public AuthProvider getAuthProvider() throws Exception { - AuthProvider mock = mock(AuthProvider.class); + AuthProvider mock = mock(AuthProviderImpl.class); doReturn(true).when(mock).authenticated(anyString(), anyString()); doReturn(PEMGenerator.generate().toCharArray()).when(mock).getPEMAsCharArray(); return mock; diff --git a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/authentication/AuthProvider.java b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/authentication/AuthProvider.java index 5d39dd1eb8..92f3861c05 100644 --- a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/authentication/AuthProvider.java +++ b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/authentication/AuthProvider.java @@ -1,81 +1,16 @@ /* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.netconf.ssh.authentication; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.annotations.VisibleForTesting; -import org.opendaylight.controller.sal.authorization.AuthResultEnum; -import org.opendaylight.controller.usermanager.IUserManager; -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceReference; -import org.osgi.util.tracker.ServiceTracker; -import org.osgi.util.tracker.ServiceTrackerCustomizer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AuthProvider { - private static final Logger logger = LoggerFactory.getLogger(AuthProvider.class); - - private final String pem; - private IUserManager nullableUserManager; - public AuthProvider(String pemCertificate, final BundleContext bundleContext) { - checkNotNull(pemCertificate, "Parameter 'pemCertificate' is null"); - pem = pemCertificate; - - ServiceTrackerCustomizer customizer = new ServiceTrackerCustomizer() { - @Override - public IUserManager addingService(final ServiceReference reference) { - logger.trace("Service {} added", reference); - nullableUserManager = bundleContext.getService(reference); - return nullableUserManager; - } - - @Override - public void modifiedService(final ServiceReference reference, final IUserManager service) { - logger.trace("Replacing modified service {} in netconf SSH.", reference); - nullableUserManager = service; - } - - @Override - public void removedService(final ServiceReference reference, final IUserManager service) { - logger.trace("Removing service {} from netconf SSH. " + - "SSH won't authenticate users until IUserManager service will be started.", reference); - synchronized (AuthProvider.this) { - nullableUserManager = null; - } - } - }; - ServiceTracker listenerTracker = new ServiceTracker<>(bundleContext, IUserManager.class, customizer); - listenerTracker.open(); - } +package org.opendaylight.controller.netconf.ssh.authentication; - /** - * Authenticate user. This implementation tracks IUserManager and delegates the decision to it. If the service is not - * available, IllegalStateException is thrown. - */ - public synchronized boolean authenticated(String username, String password) { - if (nullableUserManager == null) { - logger.warn("Cannot authenticate user '{}', user manager service is missing", username); - throw new IllegalStateException("User manager service is not available"); - } - AuthResultEnum authResult = nullableUserManager.authenticate(username, password); - logger.debug("Authentication result for user '{}' : {}", username, authResult); - return authResult.equals(AuthResultEnum.AUTH_ACCEPT) || authResult.equals(AuthResultEnum.AUTH_ACCEPT_LOC); - } +public interface AuthProvider { - public char[] getPEMAsCharArray() { - return pem.toCharArray(); - } + boolean authenticated(String username, String password); - @VisibleForTesting - void setNullableUserManager(IUserManager nullableUserManager) { - this.nullableUserManager = nullableUserManager; - } + char[] getPEMAsCharArray(); } diff --git a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/authentication/AuthProviderImpl.java b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/authentication/AuthProviderImpl.java new file mode 100644 index 0000000000..7543d17c06 --- /dev/null +++ b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/authentication/AuthProviderImpl.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.netconf.ssh.authentication; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import org.opendaylight.controller.sal.authorization.AuthResultEnum; +import org.opendaylight.controller.usermanager.IUserManager; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.osgi.util.tracker.ServiceTracker; +import org.osgi.util.tracker.ServiceTrackerCustomizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AuthProviderImpl implements AuthProvider { + private static final Logger logger = LoggerFactory.getLogger(AuthProviderImpl.class); + + private final String pem; + private IUserManager nullableUserManager; + + public AuthProviderImpl(String pemCertificate, final BundleContext bundleContext) { + checkNotNull(pemCertificate, "Parameter 'pemCertificate' is null"); + pem = pemCertificate; + + ServiceTrackerCustomizer customizer = new ServiceTrackerCustomizer() { + @Override + public IUserManager addingService(final ServiceReference reference) { + logger.trace("Service {} added", reference); + nullableUserManager = bundleContext.getService(reference); + return nullableUserManager; + } + + @Override + public void modifiedService(final ServiceReference reference, final IUserManager service) { + logger.trace("Replacing modified service {} in netconf SSH.", reference); + nullableUserManager = service; + } + + @Override + public void removedService(final ServiceReference reference, final IUserManager service) { + logger.trace("Removing service {} from netconf SSH. " + + "SSH won't authenticate users until IUserManager service will be started.", reference); + synchronized (AuthProviderImpl.this) { + nullableUserManager = null; + } + } + }; + ServiceTracker listenerTracker = new ServiceTracker<>(bundleContext, IUserManager.class, customizer); + listenerTracker.open(); + } + + /** + * Authenticate user. This implementation tracks IUserManager and delegates the decision to it. If the service is not + * available, IllegalStateException is thrown. + */ + @Override + public synchronized boolean authenticated(String username, String password) { + if (nullableUserManager == null) { + logger.warn("Cannot authenticate user '{}', user manager service is missing", username); + throw new IllegalStateException("User manager service is not available"); + } + AuthResultEnum authResult = nullableUserManager.authenticate(username, password); + logger.debug("Authentication result for user '{}' : {}", username, authResult); + return authResult.equals(AuthResultEnum.AUTH_ACCEPT) || authResult.equals(AuthResultEnum.AUTH_ACCEPT_LOC); + } + + @Override + public char[] getPEMAsCharArray() { + return pem.toCharArray(); + } + + @VisibleForTesting + void setNullableUserManager(IUserManager nullableUserManager) { + this.nullableUserManager = nullableUserManager; + } +} diff --git a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/osgi/NetconfSSHActivator.java b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/osgi/NetconfSSHActivator.java index a26843fae1..503e764409 100644 --- a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/osgi/NetconfSSHActivator.java +++ b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/osgi/NetconfSSHActivator.java @@ -20,6 +20,7 @@ import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import org.opendaylight.controller.netconf.ssh.NetconfSSHServer; import org.opendaylight.controller.netconf.ssh.authentication.AuthProvider; +import org.opendaylight.controller.netconf.ssh.authentication.AuthProviderImpl; import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator; import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil; import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil.InfixProp; @@ -72,7 +73,7 @@ public class NetconfSSHActivator implements BundleActivator { checkState(StringUtils.isNotBlank(path), "Path to ssh private key is blank. Reconfigure %s", NetconfConfigUtil.getPrivateKeyKey()); String privateKeyPEMString = PEMGenerator.readOrGeneratePK(new File(path)); - final AuthProvider authProvider = new AuthProvider(privateKeyPEMString, bundleContext); + final AuthProvider authProvider = new AuthProviderImpl(privateKeyPEMString, bundleContext); EventLoopGroup bossGroup = new NioEventLoopGroup(); NetconfSSHServer server = NetconfSSHServer.start(sshSocketAddress.getPort(), localAddress, authProvider, bossGroup); diff --git a/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java b/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java index 488c370145..61297835a0 100644 --- a/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java +++ b/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java @@ -32,6 +32,7 @@ import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication. import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshHandler; import org.opendaylight.controller.netconf.ssh.NetconfSSHServer; import org.opendaylight.controller.netconf.ssh.authentication.AuthProvider; +import org.opendaylight.controller.netconf.ssh.authentication.AuthProviderImpl; import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator; import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil; import org.slf4j.Logger; @@ -58,7 +59,7 @@ public class SSHTest { @Test public void test() throws Exception { new Thread(new EchoServer(), "EchoServer").start(); - AuthProvider authProvider = mock(AuthProvider.class); + AuthProvider authProvider = mock(AuthProviderImpl.class); doReturn(PEMGenerator.generate().toCharArray()).when(authProvider).getPEMAsCharArray(); doReturn(true).when(authProvider).authenticated(anyString(), anyString()); NetconfSSHServer netconfSSHServer = NetconfSSHServer.start(10831, NetconfConfigUtil.getNetconfLocalAddress(), diff --git a/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/ssh/authentication/SSHServerTest.java b/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/ssh/authentication/SSHServerTest.java index 5e368bc566..75d18566ee 100644 --- a/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/ssh/authentication/SSHServerTest.java +++ b/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/ssh/authentication/SSHServerTest.java @@ -60,7 +60,7 @@ public class SSHServerTest { try (InputStream is = getClass().getResourceAsStream("/RSA.pk")) { pem = IOUtils.toString(is); } - AuthProvider ap = new AuthProvider(pem, mockedContext); + AuthProviderImpl ap = new AuthProviderImpl(pem, mockedContext); ap.setNullableUserManager(um); EventLoopGroup bossGroup = new NioEventLoopGroup(); NetconfSSHServer server = NetconfSSHServer.start(PORT, NetconfConfigUtil.getNetconfLocalAddress(), diff --git a/opendaylight/netconf/netconf-testtool/pom.xml b/opendaylight/netconf/netconf-testtool/pom.xml new file mode 100644 index 0000000000..ae0bb76832 --- /dev/null +++ b/opendaylight/netconf/netconf-testtool/pom.xml @@ -0,0 +1,143 @@ + + + + + 4.0.0 + + + org.opendaylight.controller + netconf-subsystem + 0.2.5-SNAPSHOT + + + netconf-testtool + ${project.artifactId} + + + + net.sourceforge.argparse4j + argparse4j + 0.4.3 + + + ch.qos.logback + logback-classic + compile + + + ${project.groupId} + netconf-netty-util + + + org.opendaylight.controller + commons.logback_settings + + + org.opendaylight.controller + config-netconf-connector + + + org.opendaylight.controller + netconf-connector-config + + + org.opendaylight.controller + logback-config + + + org.opendaylight.yangtools + mockito-configuration + + + org.slf4j + slf4j-api + + + xmlunit + xmlunit + + + + ${project.groupId} + config-util + + + ${project.groupId} + netconf-api + + + ${project.groupId} + netconf-client + + + ${project.groupId} + netconf-impl + + + ${project.groupId} + netconf-mapping-api + + + ${project.groupId} + netconf-monitoring + + + + ${project.groupId} + netconf-ssh + + + + ${project.groupId} + netty-config-api + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + + shade + + package + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + org.opendaylight.controller.netconf.test.tool.Main + + + true + executable + + + + + + + + diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/AcceptingAuthProvider.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/AcceptingAuthProvider.java new file mode 100644 index 0000000000..35f2345248 --- /dev/null +++ b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/AcceptingAuthProvider.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.test.tool; + +import java.io.File; +import java.io.IOException; +import org.opendaylight.controller.netconf.ssh.authentication.AuthProvider; +import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator; + +class AcceptingAuthProvider implements AuthProvider { + private final String privateKeyPEMString; + + public AcceptingAuthProvider() { + try { + this.privateKeyPEMString = PEMGenerator.readOrGeneratePK(new File("PK")); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public synchronized boolean authenticated(final String username, final String password) { + return true; + } + + @Override + public char[] getPEMAsCharArray() { + return privateKeyPEMString.toCharArray(); + } +} diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/Main.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/Main.java new file mode 100644 index 0000000000..59e9f4c980 --- /dev/null +++ b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/Main.java @@ -0,0 +1,224 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.test.tool; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.List; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.annotation.Arg; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; +import com.google.common.io.CharStreams; + +public final class Main { + + // TODO add logback config + + // TODO make exi configurable + + private static final Logger LOG = LoggerFactory.getLogger(Main.class); + + static class Params { + + @Arg(dest = "schemas-dir") + public File schemasDir; + + @Arg(dest = "devices-count") + public int deviceCount; + + @Arg(dest = "starting-port") + public int startingPort; + + @Arg(dest = "generate-configs-dir") + public File generateConfigsDir; + + @Arg(dest = "generate-configs-batch-size") + public int generateConfigBatchSize; + + @Arg(dest = "ssh") + public boolean ssh; + + static ArgumentParser getParser() { + final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf testool"); + parser.addArgument("--devices-count") + .type(Integer.class) + .setDefault(1) + .type(Integer.class) + .help("Number of simulated netconf devices to spin") + .dest("devices-count"); + + parser.addArgument("--schemas-dir") + .type(File.class) + .required(true) + .help("Directory containing yang schemas to describe simulated devices") + .dest("schemas-dir"); + + parser.addArgument("--starting-port") + .type(Integer.class) + .setDefault(17830) + .help("First port for simulated device. Each other device will have previous+1 port number") + .dest("starting-port"); + + parser.addArgument("--generate-configs-batch-size") + .type(Integer.class) + .setDefault(100) + .help("Number of connector configs per generated file") + .dest("generate-configs-batch-size"); + + parser.addArgument("--generate-configs-dir") + .type(File.class) + .help("Directory where initial config files for ODL distribution should be generated") + .dest("generate-configs-dir"); + + parser.addArgument("--ssh") + .type(Boolean.class) + .setDefault(true) + .help("Whether to use ssh for transport or just pure tcp") + .dest("ssh"); + + return parser; + } + + void validate() { + checkArgument(deviceCount > 0, "Device count has to be > 0"); + checkArgument(startingPort > 1024, "Starting port has to be > 1024"); + + checkArgument(schemasDir.exists(), "Schemas dir has to exist"); + checkArgument(schemasDir.isDirectory(), "Schemas dir has to be a directory"); + checkArgument(schemasDir.canRead(), "Schemas dir has to be readable"); + } + } + + public static void main(final String[] args) { + final Params params = parseArgs(args, Params.getParser()); + params.validate(); + + final NetconfDeviceSimulator netconfDeviceSimulator = new NetconfDeviceSimulator(); + try { + final List openDevices = netconfDeviceSimulator.start(params); + if(params.generateConfigsDir != null) { + new ConfigGenerator(params.generateConfigsDir, openDevices).generate(params.ssh, params.generateConfigBatchSize); + } + } catch (final Exception e) { + LOG.error("Unhandled exception", e); + netconfDeviceSimulator.close(); + System.exit(1); + } + + // Block main thread + synchronized (netconfDeviceSimulator) { + try { + netconfDeviceSimulator.wait(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + + private static Params parseArgs(final String[] args, final ArgumentParser parser) { + final Params opt = new Params(); + try { + parser.parseArgs(args, opt); + return opt; + } catch (final ArgumentParserException e) { + parser.handleError(e); + } + + System.exit(1); + return null; + } + + private static class ConfigGenerator { + public static final String NETCONF_CONNECTOR_XML = "/initial/99-netconf-connector.xml"; + public static final String NETCONF_CONNECTOR_NAME = "controller-config"; + public static final String NETCONF_CONNECTOR_PORT = "1830"; + public static final String NETCONF_USE_SSH = "false"; + public static final String SIM_DEVICE_SUFFIX = "-sim-device"; + + private final File directory; + private final List openDevices; + + public ConfigGenerator(final File directory, final List openDevices) { + this.directory = directory; + this.openDevices = openDevices; + } + + public void generate(final boolean useSsh, final int batchSize) { + if(directory.exists() == false) { + checkState(directory.mkdirs(), "Unable to create folder %s" + directory); + } + + try(InputStream stream = Main.class.getResourceAsStream(NETCONF_CONNECTOR_XML)) { + checkNotNull(stream, "Cannot load %s", NETCONF_CONNECTOR_XML); + String configBlueprint = CharStreams.toString(new InputStreamReader(stream, Charsets.UTF_8)); + + // TODO make address configurable + checkState(configBlueprint.contains(NETCONF_CONNECTOR_NAME)); + checkState(configBlueprint.contains(NETCONF_CONNECTOR_PORT)); + checkState(configBlueprint.contains(NETCONF_USE_SSH)); + configBlueprint = configBlueprint.replace(NETCONF_CONNECTOR_NAME, "%s"); + configBlueprint = configBlueprint.replace(NETCONF_CONNECTOR_PORT, "%s"); + configBlueprint = configBlueprint.replace(NETCONF_USE_SSH, "%s"); + + final String before = configBlueprint.substring(0, configBlueprint.indexOf("")); + final String middleBlueprint = configBlueprint.substring(configBlueprint.indexOf(""), configBlueprint.indexOf("") + "".length()); + final String after = configBlueprint.substring(configBlueprint.indexOf("") + "".length()); + + int connectorCount = 0; + Integer batchStart = null; + StringBuilder b = new StringBuilder(); + b.append(before); + + for (final Integer openDevice : openDevices) { + if(batchStart == null) { + batchStart = openDevice; + } + + final String name = String.valueOf(openDevice) + SIM_DEVICE_SUFFIX; + final String configContent = String.format(middleBlueprint, name, String.valueOf(openDevice), String.valueOf(!useSsh)); + b.append(configContent); + connectorCount++; + if(connectorCount == batchSize) { + b.append(after); + Files.write(b.toString(), new File(directory, String.format("simulated-devices_%d-%d.xml", batchStart, openDevice)), Charsets.UTF_8); + connectorCount = 0; + b = new StringBuilder(); + b.append(before); + batchStart = null; + } + } + + // Write remaining + if(connectorCount != 0) { + b.append(after); + Files.write(b.toString(), new File(directory, String.format("simulated-devices_%d-%d.xml", batchStart, openDevices.get(openDevices.size() - 1))), Charsets.UTF_8); + } + + LOG.info("Config files generated in {}", directory); + } catch (final IOException e) { + throw new RuntimeException("Unable to generate config files", e); + } + } + } +} diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/ModuleBuilderCapability.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/ModuleBuilderCapability.java new file mode 100644 index 0000000000..1a68f55e55 --- /dev/null +++ b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/ModuleBuilderCapability.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.test.tool; + +import com.google.common.base.Optional; +import java.util.Date; +import java.util.List; +import org.opendaylight.controller.netconf.confignetconfconnector.util.Util; +import org.opendaylight.controller.netconf.mapping.api.Capability; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.parser.builder.impl.ModuleBuilder; + +final class ModuleBuilderCapability implements Capability { + private static final Date NO_REVISION = new Date(0); + private final ModuleBuilder input; + private final Optional content; + + public ModuleBuilderCapability(final ModuleBuilder input, final String inputStream) { + this.input = input; + this.content = Optional.of(inputStream); + } + + @Override + public String getCapabilityUri() { + // FIXME capabilities in Netconf-impl need to check for NO REVISION + final String withoutRevision = getModuleNamespace().get() + "?module=" + getModuleName().get(); + return hasRevision() ? withoutRevision + "&revision=" + Util.writeDate(input.getRevision()) : withoutRevision; + } + + @Override + public Optional getModuleNamespace() { + return Optional.of(input.getNamespace().toString()); + } + + @Override + public Optional getModuleName() { + return Optional.of(input.getName()); + } + + @Override + public Optional getRevision() { + return Optional.of(hasRevision() ? QName.formattedRevision(input.getRevision()) : ""); + } + + private boolean hasRevision() { + return !input.getRevision().equals(NO_REVISION); + } + + @Override + public Optional getCapabilitySchema() { + return content; + } + + @Override + public Optional> getLocation() { + return Optional.absent(); + } +} diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java new file mode 100644 index 0000000000..b21c02ac35 --- /dev/null +++ b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java @@ -0,0 +1,356 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.test.tool; + +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.io.CharStreams; +import com.google.common.util.concurrent.CheckedFuture; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.HashedWheelTimer; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.management.ManagementFactory; +import java.net.Inet4Address; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.UnknownHostException; +import java.util.AbstractMap; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import org.antlr.v4.runtime.ParserRuleContext; +import org.antlr.v4.runtime.tree.ParseTreeWalker; +import org.opendaylight.controller.netconf.api.monitoring.NetconfManagementSession; +import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer; +import org.opendaylight.controller.netconf.impl.NetconfServerDispatcher; +import org.opendaylight.controller.netconf.impl.NetconfServerSessionNegotiatorFactory; +import org.opendaylight.controller.netconf.impl.SessionIdProvider; +import org.opendaylight.controller.netconf.impl.osgi.NetconfMonitoringServiceImpl; +import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService; +import org.opendaylight.controller.netconf.mapping.api.Capability; +import org.opendaylight.controller.netconf.mapping.api.NetconfOperation; +import org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider; +import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService; +import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceSnapshot; +import org.opendaylight.controller.netconf.monitoring.osgi.NetconfMonitoringOperationService; +import org.opendaylight.controller.netconf.ssh.NetconfSSHServer; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation; +import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; +import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceListener; +import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache; +import org.opendaylight.yangtools.yang.parser.builder.impl.BuilderUtils; +import org.opendaylight.yangtools.yang.parser.builder.impl.ModuleBuilder; +import org.opendaylight.yangtools.yang.parser.impl.YangParserListenerImpl; +import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository; +import org.opendaylight.yangtools.yang.parser.util.ASTSchemaSource; +import org.opendaylight.yangtools.yang.parser.util.TextToASTTransformer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NetconfDeviceSimulator implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceSimulator.class); + + public static final int CONNECTION_TIMEOUT_MILLIS = 20000; + + private final NioEventLoopGroup nettyThreadgroup; + private final HashedWheelTimer hashedWheelTimer; + private final List devicesChannels = Lists.newArrayList(); + + public NetconfDeviceSimulator() { + this(new NioEventLoopGroup(), new HashedWheelTimer()); + } + + public NetconfDeviceSimulator(final NioEventLoopGroup eventExecutors, final HashedWheelTimer hashedWheelTimer) { + this.nettyThreadgroup = eventExecutors; + this.hashedWheelTimer = hashedWheelTimer; + } + + private NetconfServerDispatcher createDispatcher(final Map moduleBuilders) { + + final Set capabilities = Sets.newHashSet(Collections2.transform(moduleBuilders.keySet(), new Function() { + @Override + public Capability apply(final ModuleBuilder input) { + return new ModuleBuilderCapability(input, moduleBuilders.get(input)); + } + })); + + final SessionIdProvider idProvider = new SessionIdProvider(); + + final SimulatedOperationProvider simulatedOperationProvider = new SimulatedOperationProvider(idProvider, capabilities); + final NetconfMonitoringOperationService monitoringService = new NetconfMonitoringOperationService(new NetconfMonitoringServiceImpl(simulatedOperationProvider)); + simulatedOperationProvider.addService(monitoringService); + + final DefaultCommitNotificationProducer commitNotifier = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer()); + + final NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory( + hashedWheelTimer, simulatedOperationProvider, idProvider, CONNECTION_TIMEOUT_MILLIS, commitNotifier, new LoggingMonitoringService()); + + final NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer( + serverNegotiatorFactory); + return new NetconfServerDispatcher(serverChannelInitializer, nettyThreadgroup, nettyThreadgroup); + } + + private Map toModuleBuilders(final Map> sources) { + final Map asts = Maps.transformValues(sources, new Function, ParserRuleContext>() { + @Override + public ParserRuleContext apply(final Map.Entry input) { + return input.getKey().getAST(); + } + }); + final Map> namespaceContext = BuilderUtils.createYangNamespaceContext( + asts.values(), Optional.absent()); + + final ParseTreeWalker walker = new ParseTreeWalker(); + final Map sourceToBuilder = new HashMap<>(); + + for (final Map.Entry entry : asts.entrySet()) { + final ModuleBuilder moduleBuilder = YangParserListenerImpl.create(namespaceContext, entry.getKey().getName(), + walker, entry.getValue()).getModuleBuilder(); + + try(InputStreamReader stream = new InputStreamReader(sources.get(entry.getKey()).getValue().openStream(), Charsets.UTF_8)) { + sourceToBuilder.put(moduleBuilder, CharStreams.toString(stream)); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + return sourceToBuilder; + } + + + public List start(final Main.Params params) { + final Map moduleBuilders = parseSchemasToModuleBuilders(params); + + final NetconfServerDispatcher dispatcher = createDispatcher(moduleBuilders); + + int currentPort = params.startingPort; + + final List openDevices = Lists.newArrayList(); + for (int i = 0; i < params.deviceCount; i++) { + final InetSocketAddress address = getAddress(currentPort); + + final ChannelFuture server; + if(params.ssh) { + final LocalAddress tcpLocalAddress = new LocalAddress(address.toString()); + + server = dispatcher.createLocalServer(tcpLocalAddress); + try { + NetconfSSHServer.start(currentPort, tcpLocalAddress, new AcceptingAuthProvider(), nettyThreadgroup); + } catch (final Exception e) { + LOG.warn("Cannot start simulated device on {}, skipping", address, e); + // Close local server and continue + server.cancel(true); + if(server.isDone()) { + server.channel().close(); + } + continue; + } finally { + currentPort++; + } + + try { + server.get(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } catch (final ExecutionException e) { + LOG.warn("Cannot start ssh simulated device on {}, skipping", address, e); + continue; + } + + LOG.debug("Simulated SSH device started on {}", address); + + } else { + server = dispatcher.createServer(address); + currentPort++; + + try { + server.get(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } catch (final ExecutionException e) { + LOG.warn("Cannot start tcp simulated device on {}, skipping", address, e); + continue; + } + + LOG.debug("Simulated TCP device started on {}", address); + } + + devicesChannels.add(server.channel()); + openDevices.add(currentPort - 1); + + } + + if(openDevices.size() == params.deviceCount) { + LOG.info("All simulated devices started successfully from port {} to {}", params.startingPort, currentPort); + } else { + LOG.warn("Not all simulated devices started successfully. Started devices ar on ports {}", openDevices); + } + + return openDevices; + } + + private Map parseSchemasToModuleBuilders(final Main.Params params) { + final SharedSchemaRepository consumer = new SharedSchemaRepository("netconf-simulator"); + consumer.registerSchemaSourceListener(TextToASTTransformer.create(consumer, consumer)); + + final Set loadedSources = Sets.newHashSet(); + + consumer.registerSchemaSourceListener(new SchemaSourceListener() { + @Override + public void schemaSourceEncountered(final SchemaSourceRepresentation schemaSourceRepresentation) {} + + @Override + public void schemaSourceRegistered(final Iterable> potentialSchemaSources) { + for (final PotentialSchemaSource potentialSchemaSource : potentialSchemaSources) { + loadedSources.add(potentialSchemaSource.getSourceIdentifier()); + } + } + + @Override + public void schemaSourceUnregistered(final PotentialSchemaSource potentialSchemaSource) {} + }); + + final FilesystemSchemaSourceCache cache = new FilesystemSchemaSourceCache<>(consumer, YangTextSchemaSource.class, params.schemasDir); + consumer.registerSchemaSourceListener(cache); + + final Map> asts = Maps.newHashMap(); + for (final SourceIdentifier loadedSource : loadedSources) { + try { + final CheckedFuture ast = consumer.getSchemaSource(loadedSource, ASTSchemaSource.class); + final CheckedFuture text = consumer.getSchemaSource(loadedSource, YangTextSchemaSource.class); + asts.put(loadedSource, new AbstractMap.SimpleEntry<>(ast.get(), text.get())); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } catch (final ExecutionException e) { + throw new RuntimeException("Cannot parse schema context", e); + } + } + return toModuleBuilders(asts); + } + + private static InetSocketAddress getAddress(final int port) { + try { + // TODO make address configurable + return new InetSocketAddress(Inet4Address.getByName("0.0.0.0"), port); + } catch (final UnknownHostException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + for (final Channel deviceCh : devicesChannels) { + deviceCh.close(); + } + nettyThreadgroup.shutdownGracefully(); + // close Everything + } + + private static class SimulatedOperationProvider implements NetconfOperationProvider { + private final SessionIdProvider idProvider; + private final Set netconfOperationServices; + + + public SimulatedOperationProvider(final SessionIdProvider idProvider, final Set caps) { + this.idProvider = idProvider; + final SimulatedOperationService simulatedOperationService = new SimulatedOperationService(caps, idProvider.getCurrentSessionId()); + this.netconfOperationServices = Sets.newHashSet(simulatedOperationService); + } + + @Override + public NetconfOperationServiceSnapshot openSnapshot(final String sessionIdForReporting) { + return new SimulatedServiceSnapshot(idProvider, netconfOperationServices); + } + + public void addService(final NetconfOperationService monitoringService) { + netconfOperationServices.add(monitoringService); + } + + private static class SimulatedServiceSnapshot implements NetconfOperationServiceSnapshot { + private final SessionIdProvider idProvider; + private final Set netconfOperationServices; + + public SimulatedServiceSnapshot(final SessionIdProvider idProvider, final Set netconfOperationServices) { + this.idProvider = idProvider; + this.netconfOperationServices = netconfOperationServices; + } + + @Override + public String getNetconfSessionIdForReporting() { + return String.valueOf(idProvider.getCurrentSessionId()); + } + + @Override + public Set getServices() { + return netconfOperationServices; + } + + @Override + public void close() throws Exception {} + } + + static class SimulatedOperationService implements NetconfOperationService { + private final Set capabilities; + private static SimulatedGet sGet; + + public SimulatedOperationService(final Set capabilities, final long currentSessionId) { + this.capabilities = capabilities; + sGet = new SimulatedGet(String.valueOf(currentSessionId)); + } + + @Override + public Set getCapabilities() { + return capabilities; + } + + @Override + public Set getNetconfOperations() { + return Sets.newHashSet(sGet); + } + + @Override + public void close() { + } + + } + } + + private class LoggingMonitoringService implements SessionMonitoringService { + @Override + public void onSessionUp(final NetconfManagementSession session) { + LOG.debug("Session {} established", session); + } + + @Override + public void onSessionDown(final NetconfManagementSession session) { + LOG.debug("Session {} down", session); + } + } + +} diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedGet.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedGet.java new file mode 100644 index 0000000000..b1938c8332 --- /dev/null +++ b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/SimulatedGet.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.test.tool; + +import com.google.common.base.Optional; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException; +import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; +import org.opendaylight.controller.netconf.confignetconfconnector.operations.AbstractConfigNetconfOperation; +import org.opendaylight.controller.netconf.util.xml.XmlElement; +import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +class SimulatedGet extends AbstractConfigNetconfOperation { + + SimulatedGet(final String netconfSessionIdForReporting) { + super(null, netconfSessionIdForReporting); + } + + @Override + protected Element handleWithNoSubsequentOperations(final Document document, final XmlElement operationElement) throws NetconfDocumentedException { + return XmlUtil.createElement(document, XmlNetconfConstants.DATA_KEY, Optional.absent()); + } + + @Override + protected String getOperationName() { + return XmlNetconfConstants.GET; + } +} diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/SendErrorExceptionUtil.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/SendErrorExceptionUtil.java index 4c4ff2fc58..6604834fe4 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/SendErrorExceptionUtil.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/SendErrorExceptionUtil.java @@ -59,12 +59,12 @@ public final class SendErrorExceptionUtil { final NetconfDocumentedException sendErrorException) { try { final Element incommingRpc = incommingDocument.getDocumentElement(); - Preconditions.checkState(incommingRpc.getTagName().equals(XmlNetconfConstants.RPC_KEY), "Missing " - + XmlNetconfConstants.RPC_KEY + " " + "element"); + Preconditions.checkState(incommingRpc.getTagName().equals(XmlNetconfConstants.RPC_KEY), "Missing %s element", + XmlNetconfConstants.RPC_KEY); final Element rpcReply = errorDocument.getDocumentElement(); - Preconditions.checkState(rpcReply.getTagName().equals(XmlNetconfConstants.RPC_REPLY_KEY), "Missing " - + XmlNetconfConstants.RPC_REPLY_KEY + " element"); + Preconditions.checkState(rpcReply.getTagName().equals(XmlNetconfConstants.RPC_REPLY_KEY), "Missing %s element", + XmlNetconfConstants.RPC_REPLY_KEY); final NamedNodeMap incomingAttributes = incommingRpc.getAttributes(); for (int i = 0; i < incomingAttributes.getLength(); i++) { @@ -97,7 +97,7 @@ public final class SendErrorExceptionUtil { @Override public void operationComplete(final ChannelFuture channelFuture) throws Exception { - Preconditions.checkState(channelFuture.isSuccess(), "Unable to send exception {}", sendErrorException, + Preconditions.checkState(channelFuture.isSuccess(), "Unable to send exception %s", sendErrorException, channelFuture.cause()); } } diff --git a/opendaylight/netconf/pom.xml b/opendaylight/netconf/pom.xml index e55ec697ba..b1b410a1fc 100644 --- a/opendaylight/netconf/pom.xml +++ b/opendaylight/netconf/pom.xml @@ -150,5 +150,15 @@ netconf-it + + + testtool + + false + + + netconf-testtool + + diff --git a/pom.xml b/pom.xml index e4c51b7839..0bfc64f892 100644 --- a/pom.xml +++ b/pom.xml @@ -129,7 +129,9 @@ opendaylight/dummy-console opendaylight/karaf-branding + opendaylight/distribution/opendaylight-karaf-empty opendaylight/distribution/opendaylight-karaf + opendaylight/distribution/opendaylight-karaf-resources features