From: Ed Warnicke Date: Tue, 6 Jan 2015 23:36:01 +0000 (+0000) Subject: Merge "Fix test case mis-spelling." X-Git-Tag: release/lithium~729 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=488cc48063a540a046084b398c72e5c58d2c7288;hp=0133fc851489ac7ea5f0ca6413175fc8b7fa485b Merge "Fix test case mis-spelling." --- diff --git a/features/config/pom.xml b/features/config/pom.xml index 461427c7ce..1fa248615c 100644 --- a/features/config/pom.xml +++ b/features/config/pom.xml @@ -34,18 +34,6 @@ org.opendaylight.controller sal-common - - org.opendaylight.controller - sal-common-api - - - org.opendaylight.controller - sal-common-impl - - - org.opendaylight.controller - sal-common-util - org.opendaylight.controller config-api diff --git a/features/config/src/main/resources/features.xml b/features/config/src/main/resources/features.xml index b4dd03f491..b2e0b246ef 100644 --- a/features/config/src/main/resources/features.xml +++ b/features/config/src/main/resources/features.xml @@ -6,21 +6,12 @@ mvn:org.opendaylight.yangtools/features-yangtools/${yangtools.version}/xml/features - odl-mdsal-common odl-config-api odl-config-netty-config-api odl-config-core odl-config-manager - - odl-yangtools-data-binding - mvn:org.opendaylight.controller/sal-common/${mdsal.version} - mvn:org.opendaylight.controller/sal-common-api/${mdsal.version} - mvn:org.opendaylight.controller/sal-common-impl/${mdsal.version} - mvn:org.opendaylight.controller/sal-common-util/${mdsal.version} - - mvn:org.opendaylight.controller/config-api/${project.version} odl-yangtools-common @@ -39,7 +30,6 @@ odl-yangtools-common odl-yangtools-binding odl-yangtools-binding-generator - odl-mdsal-common odl-config-api mvn:org.opendaylight.controller/config-util/${project.version} mvn:org.opendaylight.controller/yang-jmx-generator/${project.version} @@ -53,4 +43,4 @@ odl-config-core mvn:org.opendaylight.controller/config-manager/${project.version} - \ No newline at end of file + diff --git a/features/mdsal/pom.xml b/features/mdsal/pom.xml index d81da186b9..5e6afd248f 100644 --- a/features/mdsal/pom.xml +++ b/features/mdsal/pom.xml @@ -139,6 +139,18 @@ sal-akka-raft ${mdsal.version} + + org.opendaylight.controller + sal-common-api + + + org.opendaylight.controller + sal-common-impl + + + org.opendaylight.controller + sal-common-util + org.opendaylight.controller sal-core-spi diff --git a/features/mdsal/src/main/resources/features.xml b/features/mdsal/src/main/resources/features.xml index 540cea1bbc..1582f45789 100644 --- a/features/mdsal/src/main/resources/features.xml +++ b/features/mdsal/src/main/resources/features.xml @@ -14,6 +14,13 @@ odl-mdsal-xsql odl-toaster + + odl-yangtools-data-binding + mvn:org.opendaylight.controller/sal-common/${mdsal.version} + mvn:org.opendaylight.controller/sal-common-api/${mdsal.version} + mvn:org.opendaylight.controller/sal-common-impl/${mdsal.version} + mvn:org.opendaylight.controller/sal-common-util/${mdsal.version} + odl-yangtools-common odl-yangtools-binding diff --git a/opendaylight/karaf-branding/.gitignore b/karaf/karaf-branding/.gitignore similarity index 100% rename from opendaylight/karaf-branding/.gitignore rename to karaf/karaf-branding/.gitignore diff --git a/opendaylight/karaf-branding/pom.xml b/karaf/karaf-branding/pom.xml similarity index 100% rename from opendaylight/karaf-branding/pom.xml rename to karaf/karaf-branding/pom.xml diff --git a/opendaylight/karaf-branding/src/main/resources/org/apache/karaf/branding/branding.properties b/karaf/karaf-branding/src/main/resources/org/apache/karaf/branding/branding.properties similarity index 100% rename from opendaylight/karaf-branding/src/main/resources/org/apache/karaf/branding/branding.properties rename to karaf/karaf-branding/src/main/resources/org/apache/karaf/branding/branding.properties diff --git a/karaf/karaf-parent/pom.xml b/karaf/karaf-parent/pom.xml new file mode 100644 index 0000000000..06d8c8d99b --- /dev/null +++ b/karaf/karaf-parent/pom.xml @@ -0,0 +1,345 @@ + + + + + org.opendaylight.odlparent + odlparent + 1.5.0-SNAPSHOT + + + 4.0.0 + org.opendaylight.controller + karaf-parent + ${project.artifactId} + pom + + 3.1.1 + + + 1.1.0-SNAPSHOT + 1.5.0-SNAPSHOT + + + + + + org.apache.karaf.features + framework + ${karaf.version} + kar + + + org.osgi + org.osgi.core + + + org.apache.sshd + sshd-core + + + + + + + + + org.opendaylight.controller + karaf.branding + ${branding.version} + compile + + + + + org.opendaylight.controller + opendaylight-karaf-resources + ${karaf.resources.version} + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.felix + maven-bundle-plugin + [0,) + + cleanVersions + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + [0,) + + copy + unpack + + + + + + + + + org.apache.karaf.tooling + karaf-maven-plugin + [0,) + + commands-generate-help + features-add-to-repository + install-kars + + + + + + + + + org.fusesource.scalate + maven-scalate-plugin + [0,) + + sitegen + + + + + + + + + org.apache.servicemix.tooling + depends-maven-plugin + [0,) + + generate-depends-file + + + + + + + + + + + + + maven-resources-plugin + + + copy-resources + + prepare-package + + copy-resources + + + ${basedir}/target/assembly + + + src/main/assembly + + + true + + + + + + org.apache.karaf.tooling + karaf-maven-plugin + ${karaf.version} + true + + + + standard + ${karaf.localFeature} + + + + + + populate-system + generate-resources + + features-add-to-repository + + + + mvn:org.apache.karaf.features/standard/${karaf.version}/xml/features + + + standard + config + package + kar + ssh + management + war + + target/assembly/system + + + + process-resources + + install-kars + + process-resources + + + package + + instance-create-archive + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + ${checkstyle.version} + + **\/target\/,**\/bin\/,**\/target-ide\/,**\/configuration\/initial\/ + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy + + copy + + + generate-resources + + + + org.opendaylight.controller + karaf.branding + ${karaf.branding.version} + target/assembly/lib + karaf.branding-${branding.version}.jar + + + + + + unpack-karaf-resources + + unpack-dependencies + + prepare-package + + ${project.build.directory}/assembly + org.opendaylight.controller + opendaylight-karaf-resources + META-INF\/** + true + false + + + + org.ops4j.pax.url.mvn.cfg + + copy + + prepare-package + + + + org.opendaylight.controller + opendaylight-karaf-resources + properties + config + true + ${project.build.directory}/assembly/etc/ + org.ops4j.pax.url.mvn.cfg + + + true + true + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + prepare-package + + run + + + + + + + + + + + + + + + + + + + + + + + + maven-resources-plugin + + + org.apache.karaf.tooling + karaf-maven-plugin + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + org.apache.maven.plugins + maven-dependency-plugin + + + org.apache.maven.plugins + maven-antrun-plugin + + + + + diff --git a/opendaylight/distribution/opendaylight-karaf-empty/pom.xml b/karaf/opendaylight-karaf-empty/pom.xml similarity index 99% rename from opendaylight/distribution/opendaylight-karaf-empty/pom.xml rename to karaf/opendaylight-karaf-empty/pom.xml index a66a502a70..a13023cbee 100644 --- a/opendaylight/distribution/opendaylight-karaf-empty/pom.xml +++ b/karaf/opendaylight-karaf-empty/pom.xml @@ -5,7 +5,7 @@ org.opendaylight.controller commons.opendaylight 1.5.0-SNAPSHOT - ../../commons/opendaylight + ../../opendaylight/commons/opendaylight opendaylight-karaf-empty pom diff --git a/opendaylight/distribution/opendaylight-karaf-resources/pom.xml b/karaf/opendaylight-karaf-resources/pom.xml similarity index 72% rename from opendaylight/distribution/opendaylight-karaf-resources/pom.xml rename to karaf/opendaylight-karaf-resources/pom.xml index 35aac09339..0b2b8eff1f 100644 --- a/opendaylight/distribution/opendaylight-karaf-resources/pom.xml +++ b/karaf/opendaylight-karaf-resources/pom.xml @@ -13,7 +13,7 @@ org.opendaylight.controller commons.opendaylight 1.5.0-SNAPSHOT - ../../commons/opendaylight + ../../opendaylight/commons/opendaylight opendaylight-karaf-resources Resources for opendaylight-karaf @@ -47,6 +47,28 @@ + + org.codehaus.mojo + build-helper-maven-plugin + + + attach-artifacts + + attach-artifact + + package + + + + src/main/assembly/etc/org.ops4j.pax.url.mvn.cfg + properties + config + + + + + + diff --git a/karaf/opendaylight-karaf-resources/src/main/assembly/etc/org.ops4j.pax.url.mvn.cfg b/karaf/opendaylight-karaf-resources/src/main/assembly/etc/org.ops4j.pax.url.mvn.cfg new file mode 100644 index 0000000000..9ee45e4dc4 --- /dev/null +++ b/karaf/opendaylight-karaf-resources/src/main/assembly/etc/org.ops4j.pax.url.mvn.cfg @@ -0,0 +1,106 @@ +################################################################################ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +################################################################################ + +# +# If set to true, the following property will not allow any certificate to be used +# when accessing Maven repositories through SSL +# +#org.ops4j.pax.url.mvn.certificateCheck= + +# +# Path to the local Maven settings file. +# The repositories defined in this file will be automatically added to the list +# of default repositories if the 'org.ops4j.pax.url.mvn.repositories' property +# below is not set. +# The following locations are checked for the existence of the settings.xml file +# * 1. looks for the specified url +# * 2. if not found looks for ${user.home}/.m2/settings.xml +# * 3. if not found looks for ${maven.home}/conf/settings.xml +# * 4. if not found looks for ${M2_HOME}/conf/settings.xml +# +#org.ops4j.pax.url.mvn.settings= + +# +# Path to the local Maven repository which is used to avoid downloading +# artifacts when they already exist locally. +# The value of this property will be extracted from the settings.xml file +# above, or defaulted to: +# System.getProperty( "user.home" ) + "/.m2/repository" +# +org.ops4j.pax.url.mvn.localRepository=${karaf.home}/${karaf.default.repository} + +# +# Default this to false. It's just weird to use undocumented repos +# +org.ops4j.pax.url.mvn.useFallbackRepositories=false + +# +# Uncomment if you don't wanna use the proxy settings +# from the Maven conf/settings.xml file +# +# org.ops4j.pax.url.mvn.proxySupport=false + +# +# Disable aether support by default. This ensure that the defaultRepositories +# below will be used +# +#org.ops4j.pax.url.mvn.disableAether=true + +# +# Comma separated list of repositories scanned when resolving an artifact. +# Those repositories will be checked before iterating through the +# below list of repositories and even before the local repository +# A repository url can be appended with zero or more of the following flags: +# @snapshots : the repository contains snaphots +# @noreleases : the repository does not contain any released artifacts +# +# The following property value will add the system folder as a repo. +# +#org.ops4j.pax.url.mvn.defaultRepositories= + +# Use the default local repo (e.g.~/.m2/repository) as a "remote" repo +org.ops4j.pax.url.mvn.defaultLocalRepoAsRemote=false + +# +# Comma separated list of repositories scanned when resolving an artifact. +# The default list includes the following repositories containing releases: +# http://repo1.maven.org/maven2 +# http://repository.apache.org/content/groups/snapshots-group +# http://svn.apache.org/repos/asf/servicemix/m2-repo +# http://repository.springsource.com/maven/bundles/release +# http://repository.springsource.com/maven/bundles/external +# To add repositories to the default ones, prepend '+' to the list of repositories +# to add. +# A repository url can be appended with zero or more of the following flags: +# @snapshots : the repository contains snaphots +# @noreleases : the repository does not contain any released artifacts +# @id=reponid : the id for the repository, just like in the settings.xml this is optional but recomendet +# +# The default list doesn't contain any repository containing snapshots as it can impact the artifacts resolution. +# You may want to add the following repositories containing snapshots: +# http://repository.apache.org/content/groups/snapshots-group@id=apache@snapshots@noreleases +# http://oss.sonatype.org/content/repositories/snapshots@id=sonatype.snapshots.deploy@snapshots@norelease +# http://oss.sonatype.org/content/repositories/ops4j-snapshots@id=ops4j.sonatype.snapshots.deploy@snapshots@noreleases +# +org.ops4j.pax.url.mvn.repositories= \ + file:${karaf.home}/${karaf.default.repository}@id=system.repository, \ + file:${karaf.data}/kar@id=kar.repository@multi, \ + http://repo1.maven.org/maven2@id=central, \ + http://repository.springsource.com/maven/bundles/release@id=spring.ebr.release, \ + http://repository.springsource.com/maven/bundles/external@id=spring.ebr.external diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/instance b/karaf/opendaylight-karaf-resources/src/main/resources/bin/instance similarity index 100% rename from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/instance rename to karaf/opendaylight-karaf-resources/src/main/resources/bin/instance diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/instance.bat b/karaf/opendaylight-karaf-resources/src/main/resources/bin/instance.bat similarity index 100% rename from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/instance.bat rename to karaf/opendaylight-karaf-resources/src/main/resources/bin/instance.bat diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/karaf b/karaf/opendaylight-karaf-resources/src/main/resources/bin/karaf similarity index 100% rename from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/karaf rename to karaf/opendaylight-karaf-resources/src/main/resources/bin/karaf diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/karaf.bat b/karaf/opendaylight-karaf-resources/src/main/resources/bin/karaf.bat similarity index 100% rename from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/karaf.bat rename to karaf/opendaylight-karaf-resources/src/main/resources/bin/karaf.bat diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/setenv b/karaf/opendaylight-karaf-resources/src/main/resources/bin/setenv similarity index 100% rename from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/setenv rename to karaf/opendaylight-karaf-resources/src/main/resources/bin/setenv diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/setenv.bat b/karaf/opendaylight-karaf-resources/src/main/resources/bin/setenv.bat similarity index 100% rename from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/bin/setenv.bat rename to karaf/opendaylight-karaf-resources/src/main/resources/bin/setenv.bat diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/context.xml b/karaf/opendaylight-karaf-resources/src/main/resources/configuration/context.xml similarity index 100% rename from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/context.xml rename to karaf/opendaylight-karaf-resources/src/main/resources/configuration/context.xml diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/logback.xml b/karaf/opendaylight-karaf-resources/src/main/resources/configuration/logback.xml similarity index 100% rename from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/logback.xml rename to karaf/opendaylight-karaf-resources/src/main/resources/configuration/logback.xml diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-logging.properties b/karaf/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-logging.properties similarity index 100% rename from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-logging.properties rename to karaf/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-logging.properties diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-server.xml b/karaf/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-server.xml similarity index 100% rename from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-server.xml rename to karaf/opendaylight-karaf-resources/src/main/resources/configuration/tomcat-server.xml diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties b/karaf/opendaylight-karaf-resources/src/main/resources/etc/custom.properties similarity index 100% rename from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties rename to karaf/opendaylight-karaf-resources/src/main/resources/etc/custom.properties diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/version.properties b/karaf/opendaylight-karaf-resources/src/main/resources/version.properties similarity index 100% rename from opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/version.properties rename to karaf/opendaylight-karaf-resources/src/main/resources/version.properties diff --git a/opendaylight/distribution/opendaylight-karaf/pom.xml b/karaf/opendaylight-karaf/pom.xml similarity index 99% rename from opendaylight/distribution/opendaylight-karaf/pom.xml rename to karaf/opendaylight-karaf/pom.xml index 4c8f9c5913..e0ea4c0edf 100644 --- a/opendaylight/distribution/opendaylight-karaf/pom.xml +++ b/karaf/opendaylight-karaf/pom.xml @@ -5,7 +5,7 @@ org.opendaylight.controller commons.opendaylight 1.5.0-SNAPSHOT - ../../commons/opendaylight + ../../opendaylight/commons/opendaylight distribution.opendaylight-karaf pom diff --git a/karaf/pom.xml b/karaf/pom.xml new file mode 100644 index 0000000000..1d37ae1b04 --- /dev/null +++ b/karaf/pom.xml @@ -0,0 +1,28 @@ + + + + 4.0.0 + + + org.opendaylight.controller + commons.parent + 1.1.0-SNAPSHOT + ../opendaylight/commons/parent + + + karaf-aggregator + pom + + karaf-branding + karaf-parent + opendaylight-karaf + opendaylight-karaf-empty + opendaylight-karaf-resources + + diff --git a/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java b/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java index 80d7083ec0..8c422a52ea 100644 --- a/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java +++ b/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java @@ -1,4 +1,3 @@ - /* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * @@ -22,6 +21,7 @@ import org.opendaylight.controller.configuration.IConfigurationContainerService; import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase; import org.opendaylight.controller.sal.topology.IListenTopoUpdates; import org.opendaylight.controller.sal.topology.ITopologyService; +import org.opendaylight.controller.switchmanager.IInventoryListener; import org.opendaylight.controller.switchmanager.ISwitchManager; import org.opendaylight.controller.topologymanager.ITopologyManager; import org.opendaylight.controller.topologymanager.ITopologyManagerAware; @@ -73,6 +73,7 @@ public class Activator extends ComponentActivatorAbstractBase { props.put("cachenames", propSet); c.setInterface(new String[] { IListenTopoUpdates.class.getName(), + IInventoryListener.class.getName(), ITopologyManager.class.getName(), ITopologyManagerShell.class.getName(), IConfigurationContainerAware.class.getName(), diff --git a/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java b/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java index 659ee7dd81..e1a0ca1e76 100644 --- a/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java +++ b/opendaylight/adsal/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java @@ -34,6 +34,7 @@ import org.opendaylight.controller.sal.utils.IObjectReader; import org.opendaylight.controller.sal.utils.NodeConnectorCreator; import org.opendaylight.controller.sal.utils.Status; import org.opendaylight.controller.sal.utils.StatusCode; +import org.opendaylight.controller.switchmanager.IInventoryListener; import org.opendaylight.controller.switchmanager.ISwitchManager; import org.opendaylight.controller.topologymanager.ITopologyManager; import org.opendaylight.controller.topologymanager.ITopologyManagerAware; @@ -58,6 +59,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -76,6 +79,7 @@ public class TopologyManagerImpl implements IConfigurationContainerAware, IListenTopoUpdates, IObjectReader, + IInventoryListener, CommandProvider { protected static final String TOPOEDGESDB = "topologymanager.edgesDB"; protected static final String TOPOHOSTSDB = "topologymanager.hostsDB"; @@ -83,6 +87,8 @@ public class TopologyManagerImpl implements protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB"; private static final String USER_LINKS_FILE_NAME = "userTopology.conf"; private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class); + private static final long PENDING_UPDATE_TIMEOUT = 5000L; + private ITopologyService topoService; private IClusterContainerServices clusterContainerService; private IConfigurationContainerService configurationService; @@ -104,7 +110,79 @@ public class TopologyManagerImpl implements private BlockingQueue notifyQ = new LinkedBlockingQueue(); private volatile Boolean shuttingDown = false; private Thread notifyThread; + private final Map> pendingUpdates = + new HashMap>(); + private final BlockingQueue updateQ = + new LinkedBlockingQueue(); + private Timer pendingTimer; + private Thread updateThread; + + private class PendingEdgeUpdate extends TopoEdgeUpdate { + private PendingEdgeUpdate(Edge e, Set p, UpdateType t) { + super(e, p, t); + } + } + + private class UpdateTopology implements Runnable { + @Override + public void run() { + log.trace("Start topology update thread"); + + while (!shuttingDown) { + try { + List list = new ArrayList(); + TopoEdgeUpdate teu = updateQ.take(); + for (; teu != null; teu = updateQ.poll()) { + list.add(teu); + } + + if (!list.isEmpty()) { + log.trace("Update edges: {}", list); + doEdgeUpdate(list); + } + } catch (InterruptedException e) { + if (shuttingDown) { + break; + } + log.warn("Topology update thread interrupted", e); + } catch (Exception e) { + log.error("Exception on topology update thread", e); + } + } + + log.trace("Exit topology update thread"); + } + } + + private class PendingUpdateTask extends TimerTask { + private final Edge edge; + private final Set props; + private final UpdateType type; + + private PendingUpdateTask(Edge e, Set p, UpdateType t) { + edge = e; + props = p; + type = t; + } + + private NodeConnector getHeadNodeConnector() { + return edge.getHeadNodeConnector(); + } + + private void flush() { + log.info("Flush pending topology update: edge {}, type {}", + edge, type); + updateQ.add(new PendingEdgeUpdate(edge, props, type)); + } + @Override + public void run() { + if (removePendingEvent(this)) { + log.warn("Pending topology update timed out: edge{}, type {}", + edge, type); + } + } + } void nonClusterObjectCreate() { edgesDB = new ConcurrentHashMap>(); @@ -210,6 +288,8 @@ public class TopologyManagerImpl implements // Restore the shuttingDown status on init of the component shuttingDown = false; notifyThread = new Thread(new TopologyNotify(notifyQ)); + pendingTimer = new Timer("Topology Pending Update Timer"); + updateThread = new Thread(new UpdateTopology(), "Topology Update"); } @SuppressWarnings({ "unchecked" }) @@ -277,6 +357,8 @@ public class TopologyManagerImpl implements * */ void started() { + updateThread.start(); + // Start the batcher thread for the cluster wide topology updates notifyThread.start(); // SollicitRefresh MUST be called here else if called at init @@ -287,7 +369,9 @@ public class TopologyManagerImpl implements void stop() { shuttingDown = true; + updateThread.interrupt(); notifyThread.interrupt(); + pendingTimer.cancel(); } /** @@ -297,6 +381,9 @@ public class TopologyManagerImpl implements * */ void destroy() { + updateQ.clear(); + updateThread = null; + pendingTimer = null; notifyQ.clear(); notifyThread = null; } @@ -571,17 +658,100 @@ public class TopologyManagerImpl implements return (switchManager.doesNodeConnectorExist(head)); } + private void addPendingEvent(Edge e, Set p, UpdateType t) { + NodeConnector head = e.getHeadNodeConnector(); + PendingUpdateTask task = new PendingUpdateTask(e, p, t); + synchronized (pendingUpdates) { + List list = pendingUpdates.get(head); + if (list == null) { + list = new LinkedList(); + pendingUpdates.put(head, list); + } + list.add(task); + pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT); + } + } + + private boolean enqueueEventIfPending(Edge e, Set p, UpdateType t) { + NodeConnector head = e.getHeadNodeConnector(); + synchronized (pendingUpdates) { + List list = pendingUpdates.get(head); + if (list != null) { + log.warn("Enqueue edge update: edge {}, type {}", e, t); + PendingUpdateTask task = new PendingUpdateTask(e, p, t); + list.add(task); + pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT); + return true; + } + } + + return false; + } + + private boolean removePendingEvent(PendingUpdateTask t) { + t.cancel(); + NodeConnector head = t.getHeadNodeConnector(); + boolean removed = false; + + synchronized (pendingUpdates) { + List list = pendingUpdates.get(head); + if (list != null) { + removed = list.remove(t); + if (list.isEmpty()) { + pendingUpdates.remove(head); + } + } + } + + return removed; + } + + private void removePendingEvent(NodeConnector head, boolean doFlush) { + List list; + synchronized (pendingUpdates) { + list = pendingUpdates.remove(head); + } + + if (list != null) { + for (PendingUpdateTask task : list) { + if (task.cancel() && doFlush) { + task.flush(); + } + } + pendingTimer.purge(); + } + } + private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set props) { - switch (type) { - case ADDED: + return edgeUpdate(e, type, props, false); + } + private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set props, boolean isPending) { + if (!type.equals(UpdateType.ADDED) && + enqueueEventIfPending(e, props, type)) { + return null; + } + switch (type) { + case ADDED: if (this.edgesDB.containsKey(e)) { // Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks log.trace("Skipping redundant edge addition: {}", e); return null; } + // Ensure that head node connector exists + if (!isPending) { + if (headNodeConnectorExist(e)) { + removePendingEvent(e.getHeadNodeConnector(), true); + } else { + log.warn("Ignore edge that contains invalid node connector: {}", + e); + addPendingEvent(e, props, type); + return null; + } + } + // Make sure the props are non-null or create a copy if (props == null) { props = new HashSet(); @@ -589,13 +759,6 @@ public class TopologyManagerImpl implements props = new HashSet(props); } - - // Ensure that head node connector exists - if (!headNodeConnectorExist(e)) { - log.warn("Ignore edge that contains invalid node connector: {}", e); - return null; - } - // Check if nodeConnectors of the edge were correctly categorized // by protocol plugin crossCheckNodeConnectors(e); @@ -702,16 +865,16 @@ public class TopologyManagerImpl implements return new TopoEdgeUpdate(e, props, type); } - @Override - public void edgeUpdate(List topoedgeupdateList) { + private void doEdgeUpdate(List topoedgeupdateList) { List teuList = new ArrayList(); - for (int i = 0; i < topoedgeupdateList.size(); i++) { - Edge e = topoedgeupdateList.get(i).getEdge(); - Set p = topoedgeupdateList.get(i).getProperty(); - UpdateType type = topoedgeupdateList.get(i).getUpdateType(); - TopoEdgeUpdate teu = edgeUpdate(e, type, p); - if (teu != null) { - teuList.add(teu); + for (TopoEdgeUpdate teu : topoedgeupdateList) { + boolean isPending = (teu instanceof PendingEdgeUpdate); + Edge e = teu.getEdge(); + Set p = teu.getProperty(); + UpdateType type = teu.getUpdateType(); + TopoEdgeUpdate update = edgeUpdate(e, type, p, isPending); + if (update != null) { + teuList.add(update); } } @@ -727,6 +890,11 @@ public class TopologyManagerImpl implements } } + @Override + public void edgeUpdate(List topoedgeupdateList) { + updateQ.addAll(topoedgeupdateList); + } + private Edge getReverseLinkTuple(TopologyUserLinkConfig link) { TopologyUserLinkConfig rLink = new TopologyUserLinkConfig( link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector()); @@ -934,6 +1102,19 @@ public class TopologyManagerImpl implements notifyQ.add(upd); } + @Override + public void notifyNode(Node node, UpdateType type, Map propMap) { + // NOP + } + + @Override + public void notifyNodeConnector(NodeConnector nc, UpdateType type, Map propMap) { + // Remove pending edge updates for the given node connector. + // Pending events should be notified if the node connector exists. + boolean doFlush = !type.equals(UpdateType.REMOVED); + removePendingEvent(nc, doFlush); + } + @Override public void entryCreated(final Object key, final String cacheName, final boolean originLocal) { if (cacheName.equals(TOPOEDGESDB)) { @@ -1094,4 +1275,35 @@ public class TopologyManagerImpl implements return result; } + // Only for unit test. + void startTest() { + pendingTimer = new Timer("Topology Pending Update Timer"); + updateThread = new Thread(new UpdateTopology(), "Topology Update"); + updateThread.start(); + } + + void stopTest() { + shuttingDown = true; + updateThread.interrupt(); + pendingTimer.cancel(); + } + + boolean flushUpdateQueue(long timeout) { + long limit = System.currentTimeMillis() + timeout; + long cur; + do { + if (updateQ.peek() == null) { + return true; + } + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + cur = System.currentTimeMillis(); + } while (cur < limit); + + return false; + } } diff --git a/opendaylight/adsal/topologymanager/implementation/src/test/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImplTest.java b/opendaylight/adsal/topologymanager/implementation/src/test/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImplTest.java index d1338bf695..600f1d8cbf 100644 --- a/opendaylight/adsal/topologymanager/implementation/src/test/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImplTest.java +++ b/opendaylight/adsal/topologymanager/implementation/src/test/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImplTest.java @@ -9,6 +9,8 @@ package org.opendaylight.controller.topologymanager.internal; import org.junit.Assert; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.sal.core.Bandwidth; import org.opendaylight.controller.sal.core.ConstructionException; @@ -50,6 +52,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; public class TopologyManagerImplTest { + private TopologyManagerImpl topoManagerImpl; + /** * Mockup of switch manager that only maintains existence of node * connector. @@ -78,6 +82,11 @@ public class TopologyManagerImplTest { } } + private void clear() { + nodeSet.clear(); + nodeConnectorSet.clear(); + } + @Override public Status addSubnet(SubnetConfig configObject) { return null; @@ -325,6 +334,20 @@ public class TopologyManagerImplTest { } } + @Before + public void setUp() { + topoManagerImpl = new TopologyManagerImpl(); + topoManagerImpl.startTest(); + } + + @After + public void tearDown() { + if (topoManagerImpl != null) { + topoManagerImpl.stopTest(); + topoManagerImpl = null; + } + } + /* * Sets the node, edges and properties for edges here: Edge : <1:1>--><11:11>; <1:2>--><11:12>; <3:3>--><13:13>; @@ -375,11 +398,12 @@ public class TopologyManagerImplTest { topoedgeupdateList.add(teu2); topoManagerImpl.edgeUpdate(topoedgeupdateList); } + + Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000)); } @Test public void testGetNodeEdges() throws ConstructionException { - TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl(); TestSwitchManager swMgr = new TestSwitchManager(); topoManagerImpl.setSwitchManager(swMgr); setNodeEdges(topoManagerImpl, swMgr); @@ -412,7 +436,6 @@ public class TopologyManagerImplTest { @Test public void testGetEdges() throws ConstructionException { - TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl(); TestSwitchManager swMgr = new TestSwitchManager(); topoManagerImpl.setSwitchManager(swMgr); setNodeEdges(topoManagerImpl, swMgr); @@ -496,7 +519,6 @@ public class TopologyManagerImplTest { TopologyUserLinkConfig link4 = new TopologyUserLinkConfig("default20", "OF|10@OF|20", "OF|10@OF|30"); - TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl(); TestSwitchManager swMgr = new TestSwitchManager(); topoManagerImpl.setSwitchManager(swMgr); topoManagerImpl.nonClusterObjectCreate(); @@ -529,7 +551,6 @@ public class TopologyManagerImplTest { public void testGetUserLink() { TopologyUserLinkConfig[] link = new TopologyUserLinkConfig[5]; TopologyUserLinkConfig[] reverseLink = new TopologyUserLinkConfig[5]; - TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl(); TestSwitchManager swMgr = new TestSwitchManager(); topoManagerImpl.setSwitchManager(swMgr); topoManagerImpl.nonClusterObjectCreate(); @@ -614,7 +635,6 @@ public class TopologyManagerImplTest { @Test public void testHostLinkMethods() throws ConstructionException, UnknownHostException { - TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl(); TestSwitchManager swMgr = new TestSwitchManager(); topoManagerImpl.setSwitchManager(swMgr); topoManagerImpl.nonClusterObjectCreate(); @@ -678,7 +698,6 @@ public class TopologyManagerImplTest { @Test public void testGetNodesWithNodeConnectorHost() throws ConstructionException, UnknownHostException { - TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl(); TestSwitchManager swMgr = new TestSwitchManager(); topoManagerImpl.setSwitchManager(swMgr); topoManagerImpl.nonClusterObjectCreate(); @@ -738,7 +757,6 @@ public class TopologyManagerImplTest { @Test public void bug1348FixTest() throws ConstructionException { - TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl(); TestSwitchManager swMgr = new TestSwitchManager(); topoManagerImpl.setSwitchManager(swMgr); topoManagerImpl.nonClusterObjectCreate(); @@ -763,7 +781,91 @@ public class TopologyManagerImplTest { Assert.fail("Exception was raised when trying to update edge properties: " + e.getMessage()); } + Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000)); Assert.assertEquals(1, topoManagerImpl.getEdges().size()); Assert.assertNotNull(topoManagerImpl.getEdges().get(edge)); } + + @Test + public void testNotifyNodeConnector() throws ConstructionException { + TestSwitchManager swMgr = new TestSwitchManager(); + topoManagerImpl.setSwitchManager(swMgr); + topoManagerImpl.nonClusterObjectCreate(); + + // Test NodeConnector notification in the case that there are no + // related edge updates. + NodeConnector nc1 = NodeConnectorCreator.createOFNodeConnector( + (short) 1, NodeCreator.createOFNode(1000L)); + Map propMap = new HashMap<>(); + swMgr.addNodeConnectors(nc1); + topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap); + Assert.assertEquals(0, topoManagerImpl.getEdges().size()); + + topoManagerImpl.notifyNodeConnector(nc1, UpdateType.CHANGED, propMap); + Assert.assertEquals(0, topoManagerImpl.getEdges().size()); + + swMgr.clear(); + topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap); + Assert.assertEquals(0, topoManagerImpl.getEdges().size()); + + // Test NodeConnector notification in the case that there is a related + // edge update just before the notification. + NodeConnector nc2 = NodeConnectorCreator.createOFNodeConnector( + (short) 2, NodeCreator.createOFNode(2000L)); + Edge edge1 = new Edge(nc1, nc2); + Edge edge2 = new Edge(nc2, nc1); + Set props = new HashSet(); + TopoEdgeUpdate teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED); + TopoEdgeUpdate teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED); + List topoedgeupdateList = new ArrayList(); + topoedgeupdateList.add(teu1); + topoedgeupdateList.add(teu2); + topoManagerImpl.edgeUpdate(topoedgeupdateList); + swMgr.addNodeConnectors(nc1); + topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap); + swMgr.addNodeConnectors(nc2); + topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap); + Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000)); + Assert.assertEquals(2, topoManagerImpl.getEdges().size()); + + teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED); + teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED); + topoedgeupdateList = new ArrayList(); + topoedgeupdateList.add(teu1); + topoedgeupdateList.add(teu2); + topoManagerImpl.edgeUpdate(topoedgeupdateList); + Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000)); + Assert.assertEquals(0, topoManagerImpl.getEdges().size()); + topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap); + topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap); + + swMgr.clear(); + + // Test NodeConnector notification in the case that there are multiple + // edge updates related to the NodeConnector just before the notification. + teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED); + teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED); + TopoEdgeUpdate teu3 = new TopoEdgeUpdate(edge1, props, UpdateType.CHANGED); + TopoEdgeUpdate teu4 = new TopoEdgeUpdate(edge2, props, UpdateType.CHANGED); + TopoEdgeUpdate teu5 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED); + TopoEdgeUpdate teu6 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED); + topoedgeupdateList = new ArrayList(); + topoedgeupdateList.add(teu1); + topoedgeupdateList.add(teu2); + topoedgeupdateList.add(teu3); + topoedgeupdateList.add(teu4); + topoedgeupdateList.add(teu5); + topoedgeupdateList.add(teu6); + topoManagerImpl.edgeUpdate(topoedgeupdateList); + swMgr.addNodeConnectors(nc1); + topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap); + swMgr.addNodeConnectors(nc2); + topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap); + Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000)); + Assert.assertEquals(0, topoManagerImpl.getEdges().size()); + topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap); + topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap); + Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000)); + Assert.assertEquals(0, topoManagerImpl.getEdges().size()); + } } diff --git a/opendaylight/config/config-parent/pom.xml b/opendaylight/config/config-parent/pom.xml new file mode 100644 index 0000000000..0b2b634170 --- /dev/null +++ b/opendaylight/config/config-parent/pom.xml @@ -0,0 +1,154 @@ + + + + + org.opendaylight.yangtools + binding-parent + 0.7.0-SNAPSHOT + + + + 4.0.0 + org.opendaylight.controller + config-parent + 0.3.0-SNAPSHOT + pom + + + 0.3.0-SNAPSHOT + 1.2.0-SNAPSHOT + src/main/yang-gen-config + src/main/config/default-config.xml + + + + + + + org.opendaylight.controller + config-artifacts + ${config.version} + pom + import + + + org.opendaylight.controller + mdsal-artifacts + ${mdsal.version} + pom + import + + + + + + + org.opendaylight.controller + config-api + + + org.opendaylight.controller + sal-binding-config + + + + + + + org.opendaylight.yangtools + yang-maven-plugin + + + org.opendaylight.controller + yang-jmx-generator-plugin + ${config.version} + + + + + config + + generate-sources + + + + + org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator + ${jmxGeneratorPath} + + urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang + + + + true + + + + + + maven-clean-plugin + + + + ${jmxGeneratorPath} + + ** + + + + + + + + + + + + + ${config.file} + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + attach-artifacts + + attach-artifact + + package + + + + ${config.file} + xml + config + + + + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + + + + diff --git a/opendaylight/config/pom.xml b/opendaylight/config/pom.xml index fc447aa7f9..dc13989ab1 100644 --- a/opendaylight/config/pom.xml +++ b/opendaylight/config/pom.xml @@ -43,6 +43,7 @@ config-netty-config config-artifacts + config-parent diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java index 59bec91511..8022e72157 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java @@ -13,7 +13,6 @@ import akka.actor.Props; import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; -import akka.japi.Creator; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; @@ -27,14 +26,8 @@ public class ClientActor extends UntypedActor { this.target = target; } - public static Props props(final ActorRef target){ - return Props.create(new Creator(){ - private static final long serialVersionUID = 1L; - - @Override public ClientActor create() throws Exception { - return new ClientActor(target); - } - }); + public static Props props(final ActorRef target) { + return Props.create(ClientActor.class, target); } @Override public void onReceive(Object message) throws Exception { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 6c65021d86..684c3ac30e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; -import akka.japi.Creator; import com.google.common.base.Optional; import com.google.protobuf.ByteString; import java.io.ByteArrayInputStream; @@ -53,13 +52,8 @@ public class ExampleActor extends RaftActor { } public static Props props(final String id, final Map peerAddresses, - final Optional configParams){ - return Props.create(new Creator(){ - - @Override public ExampleActor create() throws Exception { - return new ExampleActor(id, peerAddresses, configParams); - } - }); + final Optional configParams) { + return Props.create(ExampleActor.class, id, peerAddresses, configParams); } @Override public void onReceiveCommand(Object message) throws Exception{ diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java index c0ee095367..1676a41c56 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java @@ -1,10 +1,8 @@ package org.opendaylight.controller.cluster.example; -import akka.actor.Actor; import akka.actor.ActorRef; import akka.actor.Cancellable; import akka.actor.Props; -import akka.japi.Creator; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -46,12 +44,7 @@ public class ExampleRoleChangeListener extends AbstractUntypedActor implements A } public static Props getProps(final String memberName) { - return Props.create(new Creator() { - @Override - public Actor create() throws Exception { - return new ExampleRoleChangeListener(memberName); - } - }); + return Props.create(ExampleRoleChangeListener.class, memberName); } @Override diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java index a9aa56174d..d065f6d211 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java @@ -8,11 +8,9 @@ package org.opendaylight.controller.cluster.notifications; -import akka.actor.Actor; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Props; -import akka.japi.Creator; import akka.serialization.Serialization; import com.google.common.collect.Maps; import java.util.Map; @@ -35,12 +33,7 @@ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoClos } public static Props getProps(final String memberId) { - return Props.create(new Creator() { - @Override - public Actor create() throws Exception { - return new RoleChangeNotifier(memberId); - } - }); + return Props.create(RoleChangeNotifier.class, memberId); } @Override diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailboxTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailboxTest.java index 60efb9d7ca..b706d20d1a 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailboxTest.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/common/actor/MeteredBoundedMailboxTest.java @@ -12,7 +12,6 @@ import akka.actor.ActorSystem; import akka.actor.DeadLetter; import akka.actor.Props; import akka.actor.UntypedActor; -import akka.japi.Creator; import akka.testkit.JavaTestKit; import org.junit.After; import org.junit.Before; @@ -80,13 +79,7 @@ public class MeteredBoundedMailboxTest { } public static Props props(final ReentrantLock lock){ - return Props.create(new Creator(){ - private static final long serialVersionUID = 1L; - @Override - public PingPongActor create() throws Exception { - return new PingPongActor(lock); - } - }); + return Props.create(PingPongActor.class, lock); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java index 9a77e4d568..6f14af304f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java @@ -62,7 +62,11 @@ public class DataChangeListener extends AbstractUntypedActor { LOG.debug("Sending change notification {} to listener {}", change, listener); - this.listener.onDataChanged(change); + try { + this.listener.onDataChanged(change); + } catch (RuntimeException e) { + LOG.error( String.format( "Error notifying listener %s", this.listener ), e ); + } // It seems the sender is never null but it doesn't hurt to check. If the caller passes in // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java index d5a12c73c5..25d47388fe 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java @@ -13,6 +13,7 @@ import org.opendaylight.controller.cluster.datastore.messages.EnableNotification import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class DataChangeListenerTest extends AbstractActorTest { @@ -92,4 +93,38 @@ public class DataChangeListenerTest extends AbstractActorTest { } }}; } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testDataChangedWithListenerRuntimeEx(){ + new JavaTestKit(getSystem()) {{ + AsyncDataChangeEvent mockChangeEvent1 = Mockito.mock(AsyncDataChangeEvent.class); + AsyncDataChangeEvent mockChangeEvent2 = Mockito.mock(AsyncDataChangeEvent.class); + AsyncDataChangeEvent mockChangeEvent3 = Mockito.mock(AsyncDataChangeEvent.class); + + AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class); + Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataChanged(mockChangeEvent2); + + Props props = DataChangeListener.props(mockListener); + ActorRef subject = getSystem().actorOf(props, "testDataChangedWithListenerRuntimeEx"); + + // Let the DataChangeListener know that notifications should be enabled + subject.tell(new EnableNotification(true), getRef()); + + SchemaContext schemaContext = CompositeModel.createTestContext(); + + subject.tell(new DataChanged(schemaContext, mockChangeEvent1),getRef()); + expectMsgClass(DataChangedReply.class); + + subject.tell(new DataChanged(schemaContext, mockChangeEvent2),getRef()); + expectMsgClass(DataChangedReply.class); + + subject.tell(new DataChanged(schemaContext, mockChangeEvent3),getRef()); + expectMsgClass(DataChangedReply.class); + + Mockito.verify(mockListener).onDataChanged(mockChangeEvent1); + Mockito.verify(mockListener).onDataChanged(mockChangeEvent2); + Mockito.verify(mockListener).onDataChanged(mockChangeEvent3); + }}; + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java index 8b4ce31d2e..d24ed5651a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java @@ -73,6 +73,8 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext config.getRpcManagerName()); LOG.debug("rpc manager started"); + + schemaService.registerSchemaContextListener(this); } @Override diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java index 2046e419d9..31aac92051 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java @@ -13,14 +13,17 @@ import akka.actor.Props; import akka.dispatch.OnComplete; import akka.japi.Creator; import akka.japi.Pair; + import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; + import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; +import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; import org.opendaylight.controller.remote.rpc.utils.LatestEntryRoutingLogic; import org.opendaylight.controller.remote.rpc.utils.RoutingLogic; @@ -53,7 +56,7 @@ public class RpcBroker extends AbstractUntypedActor { private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class); private final Broker.ProviderSession brokerSession; private final ActorRef rpcRegistry; - private final SchemaContext schemaContext; + private SchemaContext schemaContext; private final RemoteRpcProviderConfig config; private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, @@ -75,9 +78,15 @@ public class RpcBroker extends AbstractUntypedActor { invokeRemoteRpc((InvokeRpc) message); } else if(message instanceof ExecuteRpc) { executeRpc((ExecuteRpc) message); + } else if(message instanceof UpdateSchemaContext) { + updateSchemaContext((UpdateSchemaContext) message); } } + private void updateSchemaContext(UpdateSchemaContext message) { + this.schemaContext = message.getSchemaContext(); + } + private void invokeRemoteRpc(final InvokeRpc msg) { if(LOG.isDebugEnabled()) { LOG.debug("Looking up the remote actor for rpc {}", msg.getRpc()); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java index a840712999..4cbce63f9a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java @@ -13,8 +13,8 @@ import akka.actor.ActorRef; import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.SupervisorStrategy; -import akka.japi.Creator; import akka.japi.Function; +import java.util.Set; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; @@ -26,8 +26,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; -import java.util.Set; - /** * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. * @@ -61,17 +59,10 @@ public class RpcManager extends AbstractUntypedActor { } - public static Props props(final SchemaContext schemaContext, - final Broker.ProviderSession brokerSession, - final RpcProvisionRegistry rpcProvisionRegistry) { - return Props.create(new Creator() { - private static final long serialVersionUID = 1L; - @Override - public RpcManager create() throws Exception { - return new RpcManager(schemaContext, brokerSession, rpcProvisionRegistry); - } - }); - } + public static Props props(final SchemaContext schemaContext, final Broker.ProviderSession brokerSession, + final RpcProvisionRegistry rpcProvisionRegistry) { + return Props.create(RpcManager.class, schemaContext, brokerSession, rpcProvisionRegistry); + } private void createRpcActors() { LOG.debug("Create rpc registry and broker actors"); @@ -123,6 +114,7 @@ public class RpcManager extends AbstractUntypedActor { private void updateSchemaContext(UpdateSchemaContext message) { this.schemaContext = message.getSchemaContext(); + rpcBroker.tell(message, ActorRef.noSender()); } @Override diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java index fe8c463d2e..52b1106c87 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java @@ -10,23 +10,22 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; import akka.japi.Option; import akka.japi.Pair; -import org.opendaylight.controller.remote.rpc.registry.gossip.Copier; -import org.opendaylight.controller.sal.connector.api.RpcRouter; - import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import org.opendaylight.controller.remote.rpc.registry.gossip.Copier; +import org.opendaylight.controller.sal.connector.api.RpcRouter; public class RoutingTable implements Copier, Serializable { private static final long serialVersionUID = 1L; - private Map, Long> table = new HashMap<>(); + private final Map, Long> table = new HashMap<>(); private ActorRef router; @Override public RoutingTable copy() { RoutingTable copy = new RoutingTable(); - copy.setTable(new HashMap<>(table)); + copy.table.putAll(table); copy.setRouter(this.getRouter()); return copy; @@ -35,10 +34,11 @@ public class RoutingTable implements Copier, Serializable { public Option> getRouterFor(RpcRouter.RouteIdentifier routeId){ Long updatedTime = table.get(routeId); - if (updatedTime == null || router == null) + if (updatedTime == null || router == null) { return Option.none(); - else + } else { return Option.option(new Pair<>(router, updatedTime)); + } } public void addRoute(RpcRouter.RouteIdentifier routeId){ @@ -49,23 +49,16 @@ public class RoutingTable implements Copier, Serializable { table.remove(routeId); } - public Boolean contains(RpcRouter.RouteIdentifier routeId){ + public boolean contains(RpcRouter.RouteIdentifier routeId){ return table.containsKey(routeId); } - public Boolean isEmpty(){ + public boolean isEmpty(){ return table.isEmpty(); } - /// - /// Getter, Setters - /// - //TODO: Remove public - public Map, Long> getTable() { - return table; - } - void setTable(Map, Long> table) { - this.table = table; + public int size() { + return table.size(); } public ActorRef getRouter() { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 095d70926b..845c1c819a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -8,36 +8,21 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; -import akka.actor.Address; -import akka.actor.Props; -import akka.dispatch.Mapper; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Option; import akka.japi.Pair; -import akka.pattern.Patterns; import com.google.common.base.Preconditions; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import java.util.ArrayList; +import java.util.List; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; import org.opendaylight.controller.sal.connector.api.RpcRouter; -import scala.concurrent.Future; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket; +import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; /** * Registry to look up cluster nodes that have registered for a given rpc. @@ -45,51 +30,29 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this * cluster wide information. */ -public class RpcRegistry extends AbstractUntypedActorWithMetering { +public class RpcRegistry extends BucketStore { final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - /** - * Store to keep the registry. Bucket store sync's it across nodes in the cluster - */ - private ActorRef bucketStore; - - /** - * Rpc broker that would use the registry to route requests. - */ - private ActorRef localRouter; - - private RemoteRpcProviderConfig config; - public RpcRegistry() { - bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store"); - this.config = new RemoteRpcProviderConfig(getContext().system().settings().config()); - log.info("Bucket store path = {}", bucketStore.path().toString()); + getLocalBucket().setData(new RoutingTable()); } - public RpcRegistry(ActorRef bucketStore) { - this.bucketStore = bucketStore; - } - - @Override protected void handleReceive(Object message) throws Exception { //TODO: if sender is remote, reject message - if (message instanceof SetLocalRouter) + if (message instanceof SetLocalRouter) { receiveSetLocalRouter((SetLocalRouter) message); - - if (message instanceof AddOrUpdateRoutes) + } else if (message instanceof AddOrUpdateRoutes) { receiveAddRoutes((AddOrUpdateRoutes) message); - - else if (message instanceof RemoveRoutes) + } else if (message instanceof RemoveRoutes) { receiveRemoveRoutes((RemoveRoutes) message); - - else if (message instanceof Messages.FindRouters) + } else if (message instanceof Messages.FindRouters) { receiveGetRouter((FindRouters) message); - - else - unhandled(message); + } else { + super.handleReceive(message); + } } /** @@ -98,7 +61,7 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering { * @param message contains {@link akka.actor.ActorRef} for rpc broker */ private void receiveSetLocalRouter(SetLocalRouter message) { - localRouter = message.getRouter(); + getLocalBucket().getData().setRouter(message.getRouter()); } /** @@ -106,10 +69,14 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering { */ private void receiveAddRoutes(AddOrUpdateRoutes msg) { - Preconditions.checkState(localRouter != null, "Router must be set first"); + log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); + + RoutingTable table = getLocalBucket().getData().copy(); + for(RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + table.addRoute(routeId); + } - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration()); - futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); + updateLocalBucket(table); } /** @@ -117,9 +84,12 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering { */ private void receiveRemoveRoutes(RemoveRoutes msg) { - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration()); - futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); + RoutingTable table = getLocalBucket().getData().copy(); + for (RpcRouter.RouteIdentifier routeId : msg.getRouteIdentifiers()) { + table.removeRoute(routeId); + } + updateLocalBucket(table); } /** @@ -128,168 +98,28 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering { * @param msg */ private void receiveGetRouter(FindRouters msg) { - final ActorRef sender = getSender(); - - Future futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration()); - futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher()); - } - - /** - * Helper to create empty reply when no routers are found - * - * @return - */ - private Messages.FindRoutersReply createEmptyReply() { - List> routerWithUpdateTime = Collections.emptyList(); - return new Messages.FindRoutersReply(routerWithUpdateTime); - } - - /** - * Helper to create a reply when routers are found for the given rpc - * - * @param buckets - * @param routeId - * @return - */ - private Messages.FindRoutersReply createReplyWithRouters( - Map buckets, RpcRouter.RouteIdentifier routeId) { - List> routers = new ArrayList<>(); - Option> routerWithUpdateTime = null; - - for (Bucket bucket : buckets.values()) { - - RoutingTable table = (RoutingTable) bucket.getData(); - if (table == null) - continue; - routerWithUpdateTime = table.getRouterFor(routeId); - if (routerWithUpdateTime.isEmpty()) - continue; + RouteIdentifier routeId = msg.getRouteIdentifier(); + findRoutes(getLocalBucket().getData(), routeId, routers); - routers.add(routerWithUpdateTime.get()); + for(Bucket bucket : getRemoteBuckets().values()) { + findRoutes(bucket.getData(), routeId, routers); } - return new Messages.FindRoutersReply(routers); - } - - - /// - ///private factories to create Mapper - /// - - /** - * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found - * - * @param routeId the rpc - * @param sender client who asked to find the routers. - * @return - */ - private Mapper getMapperToGetRouter( - final RpcRouter.RouteIdentifier routeId, final ActorRef sender) { - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - - if (replyMessage instanceof GetAllBucketsReply) { - - GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage; - Map buckets = reply.getBuckets(); - - if (buckets == null || buckets.isEmpty()) { - sender.tell(createEmptyReply(), getSelf()); - return null; - } - - sender.tell(createReplyWithRouters(buckets, routeId), getSelf()); - } - return null; - } - }; - } - - /** - * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently, - * it updates the local bucket in bucket store. - * - * @param routeIds rpc to remote - * @return - */ - private Mapper getMapperToRemoveRoutes(final List> routeIds) { - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - if (replyMessage instanceof GetLocalBucketReply) { - - GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage; - Bucket bucket = reply.getBucket(); - - if (bucket == null) { - log.debug("Local bucket is null"); - return null; - } - - RoutingTable table = bucket.getData(); - if (table == null) - table = new RoutingTable(); - - table.setRouter(localRouter); - - if (!table.isEmpty()) { - for (RpcRouter.RouteIdentifier routeId : routeIds) { - table.removeRoute(routeId); - } - } - bucket.setData(table); - - UpdateBucket updateBucketMessage = new UpdateBucket(bucket); - bucketStore.tell(updateBucketMessage, getSelf()); - } - return null; - } - }; + getSender().tell(new Messages.FindRoutersReply(routers), getSelf()); } - /** - * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently, - * it updates the local bucket in bucket store. - * - * @param routeIds rpc to add - * @return - */ - private Mapper getMapperToAddRoutes(final List> routeIds) { - - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - if (replyMessage instanceof GetLocalBucketReply) { - - GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage; - Bucket bucket = reply.getBucket(); - - if (bucket == null) { - log.debug("Local bucket is null"); - return null; - } - - RoutingTable table = bucket.getData(); - if (table == null) - table = new RoutingTable(); - - table.setRouter(localRouter); - for (RpcRouter.RouteIdentifier routeId : routeIds) { - table.addRoute(routeId); - } - - bucket.setData(table); - - UpdateBucket updateBucketMessage = new UpdateBucket(bucket); - bucketStore.tell(updateBucketMessage, getSelf()); - } + private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier routeId, + List> routers) { + if (table == null) { + return; + } - return null; - } - }; + Option> routerWithUpdateTime = table.getRouterFor(routeId); + if(!routerWithUpdateTime.isEmpty()) { + routers.add(routerWithUpdateTime.get()); + } } /** diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java index f5dfbc5650..c40fc9349e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java @@ -11,5 +11,4 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; public interface Bucket> { public Long getVersion(); public T getData(); - public void setData(T data); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java index 01c77f1f08..b81175e9a2 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java @@ -16,6 +16,23 @@ public class BucketImpl> implements Bucket, Serializable private T data; + public BucketImpl() { + } + + public BucketImpl(T data) { + this.data = data; + } + + public BucketImpl(Bucket other) { + this.version = other.getVersion(); + this.data = other.getData(); + } + + public void setData(T data) { + this.data = data; + this.version = System.currentTimeMillis()+1; + } + @Override public Long getVersion() { return version; @@ -23,15 +40,7 @@ public class BucketImpl> implements Bucket, Serializable @Override public T getData() { - if (this.data == null) - return null; - - return data.copy(); - } - - public void setData(T data){ - this.version = System.currentTimeMillis()+1; - this.data = data; + return data; } @Override diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index 6ffe147e71..934609b7cf 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -15,11 +15,10 @@ import akka.actor.Props; import akka.cluster.ClusterActorRefProvider; import akka.event.Logging; import akka.event.LoggingAdapter; +import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; @@ -28,9 +27,6 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; import org.opendaylight.controller.utils.ConditionalProbe; @@ -43,24 +39,26 @@ import org.opendaylight.controller.utils.ConditionalProbe; * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}. * */ -public class BucketStore extends AbstractUntypedActorWithMetering { +public class BucketStore> extends AbstractUntypedActorWithMetering { + + private static final Long NO_VERSION = -1L; final LoggingAdapter log = Logging.getLogger(getContext().system(), this); /** * Bucket owned by the node */ - private BucketImpl localBucket = new BucketImpl(); + private final BucketImpl localBucket = new BucketImpl<>(); /** * Buckets ownded by other known nodes in the cluster */ - private ConcurrentMap remoteBuckets = new ConcurrentHashMap<>(); + private final Map> remoteBuckets = new HashMap<>(); /** * Bucket version for every known node in the cluster including this node */ - private ConcurrentMap versions = new ConcurrentHashMap<>(); + private final Map versions = new HashMap<>(); /** * Cluster address for this node @@ -85,7 +83,6 @@ public class BucketStore extends AbstractUntypedActorWithMetering { } } - @Override protected void handleReceive(Object message) throws Exception { if (probe != null) { @@ -98,20 +95,14 @@ public class BucketStore extends AbstractUntypedActorWithMetering { probe = (ConditionalProbe) message; // Send back any message to tell the caller we got the probe. getSender().tell("Got it", getSelf()); - } else if (message instanceof UpdateBucket) { - receiveUpdateBucket(((UpdateBucket) message).getBucket()); } else if (message instanceof GetAllBuckets) { - receiveGetAllBucket(); - } else if (message instanceof GetLocalBucket) { - receiveGetLocalBucket(); + receiveGetAllBuckets(); } else if (message instanceof GetBucketsByMembers) { - receiveGetBucketsByMembers( - ((GetBucketsByMembers) message).getMembers()); + receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers()); } else if (message instanceof GetBucketVersions) { receiveGetBucketVersions(); } else if (message instanceof UpdateRemoteBuckets) { - receiveUpdateRemoteBuckets( - ((UpdateRemoteBuckets) message).getBuckets()); + receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets()); } else { if(log.isDebugEnabled()) { log.debug("Unhandled message [{}]", message); @@ -120,30 +111,10 @@ public class BucketStore extends AbstractUntypedActorWithMetering { } } - /** - * Returns a copy of bucket owned by this node - */ - private void receiveGetLocalBucket() { - final ActorRef sender = getSender(); - GetLocalBucketReply reply = new GetLocalBucketReply(localBucket); - sender.tell(reply, getSelf()); - } - - /** - * Updates the bucket owned by this node - * - * @param updatedBucket - */ - void receiveUpdateBucket(Bucket updatedBucket){ - - localBucket = (BucketImpl) updatedBucket; - versions.put(selfAddress, localBucket.getVersion()); - } - /** * Returns all the buckets the this node knows about, self owned + remote */ - void receiveGetAllBucket(){ + void receiveGetAllBuckets(){ final ActorRef sender = getSender(); sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf()); } @@ -153,11 +124,12 @@ public class BucketStore extends AbstractUntypedActorWithMetering { * * @return self owned + remote buckets */ + @SuppressWarnings("rawtypes") Map getAllBuckets(){ Map all = new HashMap<>(remoteBuckets.size() + 1); //first add the local bucket - all.put(selfAddress, localBucket); + all.put(selfAddress, new BucketImpl<>(localBucket)); //then get all remote buckets all.putAll(remoteBuckets); @@ -170,6 +142,7 @@ public class BucketStore extends AbstractUntypedActorWithMetering { * * @param members requested members */ + @SuppressWarnings("rawtypes") void receiveGetBucketsByMembers(Set
members){ final ActorRef sender = getSender(); Map buckets = getBucketsByMembers(members); @@ -182,12 +155,13 @@ public class BucketStore extends AbstractUntypedActorWithMetering { * @param members requested members * @return buckets for requested memebers */ + @SuppressWarnings("rawtypes") Map getBucketsByMembers(Set
members) { Map buckets = new HashMap<>(); //first add the local bucket if asked if (members.contains(selfAddress)) { - buckets.put(selfAddress, localBucket); + buckets.put(selfAddress, new BucketImpl<>(localBucket)); } //then get buckets for requested remote nodes @@ -215,8 +189,9 @@ public class BucketStore extends AbstractUntypedActorWithMetering { * @param receivedBuckets buckets sent by remote * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} */ + @SuppressWarnings({ "rawtypes", "unchecked" }) void receiveUpdateRemoteBuckets(Map receivedBuckets){ - + log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets); if (receivedBuckets == null || receivedBuckets.isEmpty()) { return; //nothing to do @@ -229,10 +204,10 @@ public class BucketStore extends AbstractUntypedActorWithMetering { Long localVersion = versions.get(entry.getKey()); if (localVersion == null) { - localVersion = -1L; + localVersion = NO_VERSION; } - Bucket receivedBucket = entry.getValue(); + Bucket receivedBucket = entry.getValue(); if (receivedBucket == null) { continue; @@ -240,7 +215,7 @@ public class BucketStore extends AbstractUntypedActorWithMetering { Long remoteVersion = receivedBucket.getVersion(); if (remoteVersion == null) { - remoteVersion = -1L; + remoteVersion = NO_VERSION; } //update only if remote version is newer @@ -249,40 +224,27 @@ public class BucketStore extends AbstractUntypedActorWithMetering { versions.put(entry.getKey(), remoteVersion); } } + if(log.isDebugEnabled()) { log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); } } - /// - ///Getter Setters - /// - - BucketImpl getLocalBucket() { + protected BucketImpl getLocalBucket() { return localBucket; } - void setLocalBucket(BucketImpl localBucket) { - this.localBucket = localBucket; + protected void updateLocalBucket(T data) { + localBucket.setData(data); + versions.put(selfAddress, localBucket.getVersion()); } - ConcurrentMap getRemoteBuckets() { + protected Map> getRemoteBuckets() { return remoteBuckets; } - void setRemoteBuckets(ConcurrentMap remoteBuckets) { - this.remoteBuckets = remoteBuckets; - } - - ConcurrentMap getVersions() { + @VisibleForTesting + Map getVersions() { return versions; } - - void setVersions(ConcurrentMap versions) { - this.versions = versions; - } - - Address getSelfAddress() { - return selfAddress; - } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java index 4e8f2c61c9..b05bd7d0f6 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java @@ -9,16 +9,14 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; import akka.actor.Address; import com.google.common.base.Preconditions; - import java.io.Serializable; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; - -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets; /** @@ -29,46 +27,13 @@ public class Messages { public static class BucketStoreMessages{ - public static class GetLocalBucket implements Serializable { - private static final long serialVersionUID = 1L; - } - - public static class ContainsBucket implements Serializable { - private static final long serialVersionUID = 1L; - final private Bucket bucket; - - public ContainsBucket(Bucket bucket){ - Preconditions.checkArgument(bucket != null, "bucket can not be null"); - this.bucket = bucket; - } - - public Bucket getBucket(){ - return bucket; - } - - } - - public static class UpdateBucket extends ContainsBucket implements Serializable { - private static final long serialVersionUID = 1L; - public UpdateBucket(Bucket bucket){ - super(bucket); - } - } - - public static class GetLocalBucketReply extends ContainsBucket implements Serializable { - private static final long serialVersionUID = 1L; - public GetLocalBucketReply(Bucket bucket){ - super(bucket); - } - } - public static class GetAllBuckets implements Serializable { private static final long serialVersionUID = 1L; } public static class GetBucketsByMembers implements Serializable{ private static final long serialVersionUID = 1L; - private Set
members; + private final Set
members; public GetBucketsByMembers(Set
members){ Preconditions.checkArgument(members != null, "members can not be null"); @@ -82,7 +47,7 @@ public class Messages { public static class ContainsBuckets implements Serializable{ private static final long serialVersionUID = 1L; - private Map buckets; + private final Map buckets; public ContainsBuckets(Map buckets){ Preconditions.checkArgument(buckets != null, "buckets can not be null"); @@ -94,11 +59,12 @@ public class Messages { for (Map.Entry entry : buckets.entrySet()){ //ignore null entries - if ( (entry.getKey() == null) || (entry.getValue() == null) ) + if ( (entry.getKey() == null) || (entry.getValue() == null) ) { continue; + } copy.put(entry.getKey(), entry.getValue()); } - return new HashMap<>(copy); + return copy; } } @@ -162,7 +128,7 @@ public class Messages { public static final class GossipStatus extends ContainsBucketVersions implements Serializable{ private static final long serialVersionUID = 1L; - private Address from; + private final Address from; public GossipStatus(Address from, Map versions) { super(versions); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java index e0d145dbe1..5b7b7e4fdc 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -1,38 +1,41 @@ package org.opendaylight.controller.remote.rpc.registry; -import akka.actor.ActorPath; import akka.actor.ActorRef; -import akka.actor.ActorSelection; import akka.actor.ActorSystem; -import akka.actor.ChildActorPath; +import akka.actor.Address; import akka.actor.Props; -import akka.pattern.Patterns; +import akka.japi.Pair; import akka.testkit.JavaTestKit; -import akka.util.Timeout; -import com.google.common.base.Predicate; +import com.google.common.util.concurrent.Uninterruptibles; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import javax.annotation.Nullable; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.controller.utils.ConditionalProbe; +import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; import org.opendaylight.yangtools.yang.common.QName; -import scala.concurrent.Await; -import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -46,6 +49,8 @@ public class RpcRegistryTest { private ActorRef registry2; private ActorRef registry3; + private int routeIdCounter = 1; + @BeforeClass public static void staticSetup() throws InterruptedException { RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build(); @@ -97,27 +102,30 @@ public class RpcRegistryTest { final JavaTestKit mockBroker = new JavaTestKit(node1); - final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store"); + Address nodeAddress = node1.provider().getDefaultAddress(); // Add rpc on node 1 registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef()); - // install probe - final JavaTestKit probe1 = createProbeForMessage(node1, bucketStorePath, - Messages.BucketStoreMessages.UpdateBucket.class); + List> addedRouteIds = createRouteIds(); - registry1.tell(getAddRouteMessage(), mockBroker.getRef()); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker.getRef()); // Bucket store should get an update bucket message. Updated bucket contains added rpc. - probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateBucket.class); + + Map buckets = retrieveBuckets(registry1, mockBroker, nodeAddress); + verifyBucket(buckets.get(nodeAddress), addedRouteIds); + + Map versions = retrieveVersions(registry1, mockBroker); + Assert.assertEquals("Version for bucket " + nodeAddress, buckets.get(nodeAddress).getVersion(), + versions.get(nodeAddress)); // Now remove rpc - registry1.tell(getRemoveRouteMessage(), mockBroker.getRef()); + registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker.getRef()); // Bucket store should get an update bucket message. Rpc is removed in the updated bucket - probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateBucket.class); + + verifyEmptyBucket(mockBroker, registry1, nodeAddress); System.out.println("testAddRemoveRpcOnSameNode ending"); @@ -136,30 +144,52 @@ public class RpcRegistryTest { System.out.println("testRpcAddRemoveInCluster starting"); final JavaTestKit mockBroker1 = new JavaTestKit(node1); + final JavaTestKit mockBroker2 = new JavaTestKit(node2); + + List> addedRouteIds = createRouteIds(); - // install probe on node2's bucket store - final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store"); - final JavaTestKit probe2 = createProbeForMessage(node2, bucketStorePath, - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + Address node1Address = node1.provider().getDefaultAddress(); // Add rpc on node 1 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); - registry1.tell(getAddRouteMessage(), mockBroker1.getRef()); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker1.getRef()); // Bucket store on node2 should get a message to update its local copy of remote buckets - probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + + Map buckets = retrieveBuckets(registry2, mockBroker2, node1Address); + verifyBucket(buckets.get(node1Address), addedRouteIds); // Now remove - registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef()); + registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker1.getRef()); - // Bucket store on node2 should get a message to update its local copy of remote buckets - probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + // Bucket store on node2 should get a message to update its local copy of remote buckets. + // Wait for the bucket for node1 to be empty. + + verifyEmptyBucket(mockBroker2, registry2, node1Address); System.out.println("testRpcAddRemoveInCluster ending"); } + private void verifyEmptyBucket(JavaTestKit testKit, ActorRef registry, Address address) + throws AssertionError { + Map buckets; + int nTries = 0; + while(true) { + buckets = retrieveBuckets(registry1, testKit, address); + + try { + verifyBucket(buckets.get(address), Collections.>emptyList()); + break; + } catch (AssertionError e) { + if(++nTries >= 50) { + throw e; + } + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + } + /** * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated. * @@ -174,76 +204,142 @@ public class RpcRegistryTest { registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef()); - // install probe on node 3 - final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store"); - final JavaTestKit probe3 = createProbeForMessage(node3, bucketStorePath, - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); - // Add rpc on node 1 + List> addedRouteIds1 = createRouteIds(); registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); - registry1.tell(getAddRouteMessage(), mockBroker1.getRef()); - - probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), mockBroker1.getRef()); - // Add same rpc on node 2 + // Add rpc on node 2 + List> addedRouteIds2 = createRouteIds(); registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef()); - registry2.tell(getAddRouteMessage(), mockBroker2.getRef()); + registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), mockBroker2.getRef()); + + Address node1Address = node1.provider().getDefaultAddress(); + Address node2Address = node2.provider().getDefaultAddress(); + + Map buckets = retrieveBuckets(registry3, mockBroker3, node1Address, + node2Address); - probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS), - Messages.BucketStoreMessages.UpdateRemoteBuckets.class); + verifyBucket(buckets.get(node1Address), addedRouteIds1); + verifyBucket(buckets.get(node2Address), addedRouteIds2); + + Map versions = retrieveVersions(registry3, mockBroker3); + Assert.assertEquals("Version for bucket " + node1Address, buckets.get(node1Address).getVersion(), + versions.get(node1Address)); + Assert.assertEquals("Version for bucket " + node2Address, buckets.get(node2Address).getVersion(), + versions.get(node2Address)); + + RouteIdentifier routeID = addedRouteIds1.get(0); + registry3.tell(new FindRouters(routeID), mockBroker3.getRef()); + + FindRoutersReply reply = mockBroker3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + FindRoutersReply.class); + + List> respList = reply.getRouterWithUpdateTime(); + Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size()); + + respList.get(0).first().tell("hello", ActorRef.noSender()); + mockBroker1.expectMsgEquals(Duration.create(3, TimeUnit.SECONDS), "hello"); } - private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class clazz) - throws Exception { - final JavaTestKit probe = new JavaTestKit(node); - - ConditionalProbe conditionalProbe = new ConditionalProbe(probe.getRef(), new Predicate() { - @Override - public boolean apply(@Nullable Object input) { - if (input != null) { - return clazz.equals(input.getClass()); - } else { - return false; - } + private Map retrieveVersions(ActorRef bucketStore, JavaTestKit testKit) { + bucketStore.tell(new GetBucketVersions(), testKit.getRef()); + GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + GetBucketVersionsReply.class); + return reply.getVersions(); + } + + private void verifyBucket(Bucket bucket, List> expRouteIds) { + RoutingTable table = bucket.getData(); + Assert.assertNotNull("Bucket RoutingTable is null", table); + for(RouteIdentifier r: expRouteIds) { + if(!table.contains(r)) { + Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table); } - }); + } - FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS); - Timeout timeout = new Timeout(duration); - int maxTries = 30; - int i = 0; - while(true) { - ActorSelection subject = node.actorSelection(subjectPath); - Future future = Patterns.ask(subject, conditionalProbe, timeout); + Assert.assertEquals("RoutingTable size", expRouteIds.size(), table.size()); + } - try { - Await.ready(future, duration); - break; - } catch (TimeoutException | InterruptedException e) { - if(++i > maxTries) { - throw e; + private Map retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit, + Address... addresses) { + int nTries = 0; + while(true) { + bucketStore.tell(new GetAllBuckets(), testKit.getRef()); + GetAllBucketsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), + GetAllBucketsReply.class); + + Map buckets = reply.getBuckets(); + boolean foundAll = true; + for(Address addr: addresses) { + Bucket bucket = buckets.get(addr); + if(bucket == null) { + foundAll = false; + break; } } - } - return probe; + if(foundAll) { + return buckets; + } - } + if(++nTries >= 50) { + Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses) + + ", Actual: " + buckets); + } - private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException { - return new AddOrUpdateRoutes(createRouteIds()); + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } } - private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException { - return new RemoveRoutes(createRouteIds()); + @SuppressWarnings("unchecked") + @Test + public void testAddRoutesConcurrency() throws Exception { + final JavaTestKit testKit = new JavaTestKit(node1); + + registry1.tell(new SetLocalRouter(testKit.getRef()), ActorRef.noSender()); + + final int nRoutes = 500; + final RouteIdentifier[] added = new RouteIdentifier[nRoutes]; + for(int i = 0; i < nRoutes; i++) { + final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, + new QName(new URI("/mockrpc"), "type" + i), null); + added[i] = routeId; + + //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + registry1.tell(new AddOrUpdateRoutes(Arrays.>asList(routeId)), + ActorRef.noSender()); + } + + GetAllBuckets getAllBuckets = new GetAllBuckets(); + FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS); + int nTries = 0; + while(true) { + registry1.tell(getAllBuckets, testKit.getRef()); + GetAllBucketsReply reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class); + + Bucket localBucket = reply.getBuckets().values().iterator().next(); + RoutingTable table = localBucket.getData(); + if(table != null && table.size() == nRoutes) { + for(RouteIdentifier r: added) { + Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r)); + } + + break; + } + + if(++nTries >= 50) { + Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size()); + } + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } } private List> createRouteIds() throws URISyntaxException { - QName type = new QName(new URI("/mockrpc"), "mockrpc"); + QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++); List> routeIds = new ArrayList<>(); routeIds.add(new RouteIdentifierImpl(null, type, null)); return routeIds; } - } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java index 78fcbd3a14..ddd08a5f47 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java @@ -12,27 +12,23 @@ import akka.actor.Address; import akka.actor.Props; import akka.testkit.TestActorRef; import com.typesafe.config.ConfigFactory; +import java.util.HashMap; +import java.util.Map; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.remote.rpc.TerminationMonitor; -import java.util.HashMap; -import java.util.Map; - public class BucketStoreTest { private static ActorSystem system; - private static BucketStore store; @BeforeClass public static void setup() { system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test")); system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); - - store = createStore(); } @AfterClass @@ -40,21 +36,6 @@ public class BucketStoreTest { system.shutdown(); } - /** - * Given a new local bucket - * Should replace - */ - @Test - public void testReceiveUpdateBucket(){ - Bucket bucket = new BucketImpl(); - Long expectedVersion = bucket.getVersion(); - - store.receiveUpdateBucket(bucket); - - Assert.assertEquals(bucket, store.getLocalBucket()); - Assert.assertEquals(expectedVersion, store.getLocalBucket().getVersion()); - } - /** * Given remote buckets * Should merge with local copy of remote buckets @@ -62,6 +43,8 @@ public class BucketStoreTest { @Test public void testReceiveUpdateRemoteBuckets(){ + BucketStore store = createStore(); + Address localAddress = system.provider().getDefaultAddress(); Bucket localBucket = new BucketImpl(); @@ -84,7 +67,7 @@ public class BucketStoreTest { //Should NOT contain local bucket //Should contain ONLY 3 entries i.e a1, a2, a3 - Map remoteBucketsInStore = store.getRemoteBuckets(); + Map> remoteBucketsInStore = store.getRemoteBuckets(); Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress)); Assert.assertTrue(remoteBucketsInStore.size() == 3); @@ -122,11 +105,9 @@ public class BucketStoreTest { Assert.assertTrue(remoteBucketsInStore.size() == 4); //Should update versions map - //versions map contains versions for all remote buckets (4) + local bucket - //so it should have total 5. + //versions map contains versions for all remote buckets (4). Map versionsInStore = store.getVersions(); - Assert.assertTrue(String.format("Expected:%s, Actual:%s", 5, versionsInStore.size()), - versionsInStore.size() == 5); + Assert.assertEquals(4, versionsInStore.size()); Assert.assertEquals(b1.getVersion(), versionsInStore.get(a1)); Assert.assertEquals(b2.getVersion(), versionsInStore.get(a2)); Assert.assertEquals(b3_new.getVersion(), versionsInStore.get(a3)); diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManager.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManager.java index 751a68965d..6124bdf642 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManager.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManager.java @@ -24,6 +24,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103. import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener; @@ -53,14 +54,47 @@ public interface StatisticsManager extends AutoCloseable, TransactionChainListen * Internal {@link TransactionChainListener} joining all DS commits * to Set of chained changes for prevent often DataStore touches. */ - public interface StatDataStoreOperation { + public abstract class StatDataStoreOperation { + public enum StatsManagerOperationType { + /** + * Operation will carry out work related to new node addition / + * update + */ + NODE_UPDATE, + /** + * Operation will carry out work related to node removal + */ + NODE_REMOVAL, + /** + * Operation will commit data to the operational data store + */ + DATA_COMMIT_OPER_DS + } + + private NodeId nodeId; + private StatsManagerOperationType operationType = StatsManagerOperationType.DATA_COMMIT_OPER_DS; + + public StatDataStoreOperation(final StatsManagerOperationType operType, final NodeId id){ + if(operType != null){ + operationType = operType; + } + nodeId = id; + } + + public final StatsManagerOperationType getType() { + return operationType; + } + + public final NodeId getNodeId(){ + return nodeId; + } /** - * Apply all read / write (put|merge) operation - * for DataStore + * Apply all read / write (put|merge) operation for DataStore + * * @param {@link ReadWriteTransaction} tx */ - void applyOperation(ReadWriteTransaction tx); + public abstract void applyOperation(ReadWriteTransaction tx); } diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitFlow.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitFlow.java index e17c45dc76..49fe3bbefd 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitFlow.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitFlow.java @@ -25,6 +25,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer; import org.opendaylight.controller.md.statistics.manager.StatisticsManager; import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation; +import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType; import org.opendaylight.controller.md.statistics.manager.impl.helper.FlowComparator; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; @@ -120,7 +121,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit> txContainer = getTransactionCacheContainer(transId, nodeId); @@ -218,6 +219,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit nodeIdent = InstanceIdentifier @@ -157,7 +158,7 @@ public class StatListenCommitGroup extends StatAbstractListenCommit nodeIdent = InstanceIdentifier.create(Nodes.class) .child(Node.class, new NodeKey(nodeId)); /* Don't block RPC Notification thread */ - manager.enqueue(new StatDataStoreOperation() { + manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) { @Override public void applyOperation(final ReadWriteTransaction trans) { final Optional> txContainer = getTransactionCacheContainer(transId, nodeId); diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatNotifyCommitTable.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatNotifyCommitTable.java index 53bca87034..2d730645ac 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatNotifyCommitTable.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatNotifyCommitTable.java @@ -17,6 +17,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer; import org.opendaylight.controller.md.statistics.manager.StatisticsManager; import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation; +import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; @@ -81,7 +82,7 @@ public class StatNotifyCommitTable extends StatAbstractNotifyCommit tableStats = new ArrayList(10); diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java index 4870223c0f..20341bcc66 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.md.statistics.manager.impl; +import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; @@ -50,6 +51,7 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.cache.Cache; @@ -174,13 +176,15 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { @Override public void onSuccess(final RpcResult result) { final TransactionId id = result.getResult().getTransactionId(); + final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class); if (id == null) { - LOG.warn("No protocol support"); + String[] multipartRequestName = result.getResult().getClass().getSimpleName().split("(?=\\p{Upper})"); + LOG.warn("Node [{}] does not support statistics request type : {}", + nodeKey.getId(),Joiner.on(" ").join(Arrays.copyOfRange(multipartRequestName, 2, multipartRequestName.length-2))); } else { if (resultTransId != null) { resultTransId.set(id); } - final NodeKey nodeKey = nodeRef.getValue().firstKeyOf(Node.class, NodeKey.class); final String cacheKey = buildCacheKey(id, nodeKey.getId()); final TransactionCacheContainer container = new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId()); diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java index 1d03e38c16..437c92f6a0 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java @@ -29,6 +29,7 @@ import org.opendaylight.controller.md.statistics.manager.StatPermCollector; import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes; import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager; import org.opendaylight.controller.md.statistics.manager.StatisticsManager; +import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter; @@ -223,7 +224,18 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { private synchronized void cleanDataStoreOperQueue() { // Drain all events, making sure any blocked threads are unblocked while (! dataStoreOperQueue.isEmpty()) { - dataStoreOperQueue.poll(); + StatDataStoreOperation op = dataStoreOperQueue.poll(); + + // Execute the node removal clean up operation if queued in the + // operational queue. + if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) { + try { + LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId()); + op.applyOperation(null); + } catch (final Exception ex) { + LOG.warn("Unhandled exception while cleaning up internal data of node [{}]",op.getNodeId()); + } + } } } diff --git a/pom.xml b/pom.xml index 3bfffaa5d4..1217d72066 100644 --- a/pom.xml +++ b/pom.xml @@ -45,10 +45,7 @@ opendaylight/commons/liblldp - opendaylight/karaf-branding - opendaylight/distribution/opendaylight-karaf-empty - opendaylight/distribution/opendaylight-karaf - opendaylight/distribution/opendaylight-karaf-resources + karaf features