Extract common code from both topology implementations.
Add Netconf specific topology callback implementations.
Change the clustered pipeline slightly as needed for clustering
implementation.
Change-Id: I6f9e424083423cf7491569ada35f4729e3af63f7
Signed-off-by: Tomas Cere <tcere@cisco.com>
<classifier>config</classifier>
<type>xml</type>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>netconf-topology-config</artifactId>
+ <classifier>clustered-config</classifier>
+ <type>xml</type>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netconf-tcp</artifactId>
<bundle>mvn:org.opendaylight.netconf/sal-netconf-connector/${controller.mdsal.version}</bundle>
<bundle>mvn:org.opendaylight.controller.model/model-inventory/${controller.mdsal.version}</bundle>
<bundle>mvn:org.opendaylight.netconf/netconf-topology/${netconf.version}</bundle>
+ <bundle>mvn:org.opendaylight.netconf/abstract-topology/${netconf.version}</bundle>
<bundle>mvn:org.opendaylight.netconf/sal-netconf-connector/${netconf.connector.version}</bundle>
<bundle>mvn:org.opendaylight.netconf/netconf-config-dispatcher/${netconf.version}</bundle>
<configfile finalname='${config.configfile.directory}/${config.netconf.client.configfile}'>mvn:org.opendaylight.netconf/netconf-config/${netconf.version}/xml/config</configfile>
- <configfile finalname='${config.configfile.directory}/${config.netconf.topology.configfile}'>mvn:org.opendaylight.netconf/netconf-topology-config/${netconf.version}/xml/config</configfile>
</feature>
<feature name='odl-netconf-connector-ssh' version='${project.version}' description="OpenDaylight :: Netconf Connector :: Netconf Connector + Netconf SSH Server + loopback connection configuration">
<configfile finalname="${config.configfile.directory}/${config.netconf.connector.configfile}">mvn:org.opendaylight.netconf/netconf-connector-config/${netconf.version}/xml/config</configfile>
</feature>
+ <feature name='odl-netconf-topology' version='${project.version}' description="OpenDaylight :: Netconf Topology :: Netconf Connector + Netconf SSH Server + Netconf configuration via config topology datastore">
+ <feature version='${netconf.version}'>odl-netconf-ssh</feature>
+ <feature version='${project.version}'>odl-netconf-connector</feature>
+ <configfile finalname='${config.configfile.directory}/${config.netconf.topology.configfile}'>mvn:org.opendaylight.netconf/netconf-topology-config/${netconf.version}/xml/config</configfile>
+ </feature>
+
+ <feature name='odl-netconf-clustered-topology' version='${project.version}' description="OpenDaylight :: Clustered Netconf Topology :: Netconf Connector + Netconf SSH Server + Clustered Netconf configuration via config topology datastore">
+ <feature version='${netconf.version}'>odl-netconf-ssh</feature>
+ <feature version='${project.version}'>odl-netconf-connector</feature>
+ <configfile finalname='${config.configfile.directory}/${config.netconf.topology.configfile}'>mvn:org.opendaylight.netconf/netconf-topology-config/${netconf.version}/xml/clustered-config</configfile>
+ </feature>
+
</features>
<classifier>config</classifier>
<type>xml</type>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>netconf-topology-config</artifactId>
+ <version>${project.version}</version>
+ <classifier>clustered-config</classifier>
+ <type>xml</type>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netconf-client</artifactId>
<type>xml</type>
<classifier>config</classifier>
</artifact>
+ <artifact>
+ <file>${project.build.directory}/classes/initial/02-clustered-netconf-topology.xml</file>
+ <type>xml</type>
+ <classifier>clustered-config</classifier>
+ </artifact>
</artifacts>
</configuration>
</execution>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!--
+ Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<snapshot>
+ <configuration>
+ <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:clustered:netconf:topology">prefix:clustered-netconf-topology-impl</type>
+ <name>clustered-netconf-topology</name>
+ <topology-id xmlns="urn:opendaylight:params:xml:ns:yang:controller:clustered:netconf:topology">topology-netconf</topology-id>
+ <event-executor xmlns="urn:opendaylight:params:xml:ns:yang:controller:clustered:netconf:topology">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:netty">prefix:netty-event-executor</type>
+ <name>global-event-executor</name>
+ </event-executor>
+ <binding-registry xmlns="urn:opendaylight:params:xml:ns:yang:controller:clustered:netconf:topology">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">prefix:binding-broker-osgi-registry</type>
+ <name>binding-osgi-broker</name>
+ </binding-registry>
+ <dom-registry xmlns="urn:opendaylight:params:xml:ns:yang:controller:clustered:netconf:topology">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">prefix:dom-broker-osgi-registry</type>
+ <name>dom-broker</name>
+ </dom-registry>
+ <client-dispatcher xmlns="urn:opendaylight:params:xml:ns:yang:controller:clustered:netconf:topology">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:netconf">prefix:netconf-client-dispatcher</type>
+ <name>global-netconf-dispatcher</name>
+ </client-dispatcher>
+ <processing-executor xmlns="urn:opendaylight:params:xml:ns:yang:controller:clustered:netconf:topology">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadpool</type>
+ <name>global-netconf-processing-executor</name>
+ </processing-executor>
+ <keepalive-executor xmlns="urn:opendaylight:params:xml:ns:yang:controller:clustered:netconf:topology">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:scheduled-threadpool</type>
+ <name>global-netconf-ssh-scheduled-executor</name>
+ </keepalive-executor>
+ <shared-schema-repository xmlns="urn:opendaylight:params:xml:ns:yang:controller:clustered:netconf:topology">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:netconf:topology:shared:schema:repository">prefix:shared-schema-repository</type>
+ <name>default-shared-schema-repository</name>
+ </shared-schema-repository>
+ <entity-ownership-service xmlns="urn:opendaylight:params:xml:ns:yang:controller:clustered:netconf:topology">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:entity-ownership-service">prefix:entity-ownership-service</type>
+ <name>entity-ownership-service</name>
+ </entity-ownership-service>
+ <actor-system-provider-service xmlns="urn:opendaylight:params:xml:ns:yang:controller:clustered:netconf:topology">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:service">prefix:actor-system-provider-service</type>
+ <name>actor-system-provider</name>
+ </actor-system-provider-service>
+ </module>
+ </modules>
+
+ <services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ <service>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:netconf:topology">prefix:netconf-topology</type>
+ <instance>
+ <name>clustered-netconf-topology</name>
+ <provider>/modules/module[type='clustered-netconf-topology-impl'][name='clustered-netconf-topology']</provider>
+ </instance>
+ </service>
+ </services>
+ </data>
+ </configuration>
+ <required-capabilities>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:netconf:topology?module=netconf-topology&revision=2015-07-27</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:clustered:netconf:topology?module=clustered-netconf-topology&revision=2015-11-04</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:config:distributed-entity-ownership-service?module=distributed-entity-ownership-service&revision=2015-08-10</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:service?module=actor-system-provider-service&revision=2015-10-05</capability>
+ </required-capabilities>
+</snapshot>
\ No newline at end of file
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:netconf:topology">prefix:netconf-topology-impl</type>
<name>default-netconf-topology</name>
<topology-id xmlns="urn:opendaylight:params:xml:ns:yang:controller:netconf:topology">topology-netconf</topology-id>
- <listen-for-config-changes xmlns="urn:opendaylight:params:xml:ns:yang:controller:netconf:topology">
- true
- </listen-for-config-changes>
<event-executor xmlns="urn:opendaylight:params:xml:ns:yang:controller:netconf:topology">
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:netty">prefix:netty-event-executor</type>
<name>global-event-executor</name>
<groupId>org.opendaylight.netconf</groupId>
<artifactId>sal-netconf-connector</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-clustering-commons</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.netconf</groupId>
+ <artifactId>abstract-topology</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.major.version}.${scala.minor.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor_${scala.major.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-remote_${scala.major.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-cluster_${scala.major.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-osgi_${scala.major.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ <version>${typesafe.config.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.jayway.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>1.6.5</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
--- /dev/null
+package org.opendaylight.controller.config.yang.clustered.netconf.topology;
+
+import org.opendaylight.netconf.topology.impl.ClusteredNetconfTopology;
+
+public class ClusteredNetconfTopologyModule extends org.opendaylight.controller.config.yang.clustered.netconf.topology.AbstractClusteredNetconfTopologyModule {
+ public ClusteredNetconfTopologyModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ super(identifier, dependencyResolver);
+ }
+
+ public ClusteredNetconfTopologyModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.clustered.netconf.topology.ClusteredNetconfTopologyModule oldModule, java.lang.AutoCloseable oldInstance) {
+ super(identifier, dependencyResolver, oldModule, oldInstance);
+ }
+
+ @Override
+ public void customValidation() {
+ // add custom validation form module attributes here.
+ }
+
+ @Override
+ public java.lang.AutoCloseable createInstance() {
+ return new ClusteredNetconfTopology(getTopologyId(),
+ getClientDispatcherDependency(),
+ getBindingRegistryDependency(),
+ getDomRegistryDependency(),
+ getEventExecutorDependency(),
+ getKeepaliveExecutorDependency(),
+ getProcessingExecutorDependency(),
+ getSharedSchemaRepositoryDependency(),
+ getActorSystemProviderServiceDependency().getActorSystem(),
+ getEntityOwnershipServiceDependency());
+ }
+
+}
--- /dev/null
+/*
+* Generated file
+*
+* Generated from: yang module name: clustered-netconf-topology yang module local name: clustered-netconf-topology-impl
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Wed Nov 04 10:59:45 CET 2015
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.clustered.netconf.topology;
+public class ClusteredNetconfTopologyModuleFactory extends org.opendaylight.controller.config.yang.clustered.netconf.topology.AbstractClusteredNetconfTopologyModuleFactory {
+
+}
@Override
public AutoCloseable createInstance() {
- return new NetconfTopologyImpl(getTopologyId(), getListenForConfigChanges(), getClientDispatcherDependency(), getBindingRegistryDependency(),
+ return new NetconfTopologyImpl(getTopologyId(), getClientDispatcherDependency(), getBindingRegistryDependency(),
getDomRegistryDependency(), getEventExecutorDependency(), getKeepaliveExecutorDependency(),
getProcessingExecutorDependency(), getSharedSchemaRepositoryDependency());
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.util.concurrent.EventExecutor;
+import java.io.File;
+import java.math.BigDecimal;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
+import org.opendaylight.controller.config.threadpool.ThreadPool;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
+import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
+import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
+import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
+import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
+import org.opendaylight.protocol.framework.TimedReconnectStrategy;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.Identifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
+import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
+import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
+import org.opendaylight.yangtools.yang.parser.util.TextToASTTransformer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractNetconfTopology implements NetconfTopology, BindingAwareProvider, Provider{
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfTopology.class);
+
+ private static final long DEFAULT_REQUEST_TIMEOUT_MILIS = 60000L;
+ private static final int DEFAULT_KEEPALIVE_DELAY = 0;
+ private static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
+ private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
+ private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
+ private static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5);
+
+ private static FilesystemSchemaSourceCache<YangTextSchemaSource> CACHE = null;
+ //keep track of already initialized repositories to avoid adding redundant listeners
+ private static final Set<SchemaRepository> INITIALIZED_SCHEMA_REPOSITORIES = new HashSet<>();
+
+ protected final String topologyId;
+ private final NetconfClientDispatcher clientDispatcher;
+ protected final BindingAwareBroker bindingAwareBroker;
+ private final Broker domBroker;
+ private final EventExecutor eventExecutor;
+ private final ScheduledThreadPool keepaliveExecutor;
+ private final ThreadPool processingExecutor;
+ private final SharedSchemaRepository sharedSchemaRepository;
+
+ private SchemaSourceRegistry schemaRegistry = null;
+ private SchemaContextFactory schemaContextFactory = null;
+
+ protected DOMMountPointService mountPointService = null;
+ protected DataBroker dataBroker = null;
+ protected final HashMap<NodeId, NetconfConnectorDTO> activeConnectors = new HashMap<>();
+
+ protected AbstractNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
+ final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
+ final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
+ final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider) {
+ this.topologyId = topologyId;
+ this.clientDispatcher = clientDispatcher;
+ this.bindingAwareBroker = bindingAwareBroker;
+ this.domBroker = domBroker;
+ this.eventExecutor = eventExecutor;
+ this.keepaliveExecutor = keepaliveExecutor;
+ this.processingExecutor = processingExecutor;
+ this.sharedSchemaRepository = schemaRepositoryProvider.getSharedSchemaRepository();
+
+ initFilesystemSchemaSourceCache(sharedSchemaRepository);
+ }
+
+ protected void registerToSal(BindingAwareProvider baProvider, Provider provider) {
+ domBroker.registerProvider(provider);
+ bindingAwareBroker.registerProvider(baProvider);
+ }
+
+ private void initFilesystemSchemaSourceCache(SharedSchemaRepository repository) {
+ LOG.warn("Schema repository used: {}", repository.getIdentifier());
+ if (CACHE == null) {
+ CACHE = new FilesystemSchemaSourceCache<>(repository, YangTextSchemaSource.class, new File("cache/schema"));
+ }
+ if (!INITIALIZED_SCHEMA_REPOSITORIES.contains(repository)) {
+ repository.registerSchemaSourceListener(CACHE);
+ repository.registerSchemaSourceListener(TextToASTTransformer.create(repository, repository));
+ INITIALIZED_SCHEMA_REPOSITORIES.add(repository);
+ }
+ setSchemaRegistry(repository);
+ setSchemaContextFactory(repository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT));
+ }
+
+ public void setSchemaRegistry(final SchemaSourceRegistry schemaRegistry) {
+ this.schemaRegistry = schemaRegistry;
+ }
+
+ public void setSchemaContextFactory(final SchemaContextFactory schemaContextFactory) {
+ this.schemaContextFactory = schemaContextFactory;
+ }
+
+ @Override
+ public abstract void onSessionInitiated(ProviderContext session);
+
+ @Override
+ public String getTopologyId() {
+ return topologyId;
+ }
+
+ @Override
+ public DataBroker getDataBroker() {
+ return dataBroker;
+ }
+
+ @Override
+ public ListenableFuture<NetconfDeviceCapabilities> connectNode(NodeId nodeId, Node configNode) {
+ LOG.info("Connecting RemoteDevice{{}} , with config {}", nodeId, configNode);
+ return setupConnection(nodeId, configNode);
+ }
+
+ @Override
+ public ListenableFuture<Void> disconnectNode(NodeId nodeId) {
+ LOG.debug("Disconnecting RemoteDevice{{}}", nodeId.getValue());
+ if (!activeConnectors.containsKey(nodeId)) {
+ return Futures.immediateFailedFuture(new IllegalStateException("Unable to disconnect device that is not connected"));
+ }
+
+ // retrieve connection, and disconnect it
+ final NetconfConnectorDTO connectorDTO = activeConnectors.remove(nodeId);
+ connectorDTO.getCommunicator().close();
+ connectorDTO.getFacade().close();
+ return Futures.immediateFuture(null);
+ }
+
+ private ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
+ final Node configNode) {
+ final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
+
+ Preconditions.checkNotNull(netconfNode.getHost());
+ Preconditions.checkNotNull(netconfNode.getPort());
+ Preconditions.checkNotNull(netconfNode.isTcpOnly());
+
+ final NetconfConnectorDTO deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode);
+ final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
+ final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(deviceCommunicator, netconfNode);
+ final ListenableFuture<NetconfDeviceCapabilities> future = deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig);
+ activeConnectors.put(nodeId, deviceCommunicatorDTO);
+
+ Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
+ @Override
+ public void onSuccess(NetconfDeviceCapabilities result) {
+ LOG.debug("Connector for : " + nodeId.getValue() + " started succesfully");
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Connector for : " + nodeId.getValue() + " failed");
+ // remove this node from active connectors?
+ }
+ });
+
+ return future;
+ }
+
+ private NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
+ final NetconfNode node) {
+ //setup default values since default value is not supported yet in mdsal
+ // TODO remove this when mdsal starts supporting default values
+ final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
+ final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
+ final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
+
+ IpAddress ipAddress = node.getHost().getIpAddress();
+ InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
+ ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
+ node.getPort().getValue());
+ RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
+
+ RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
+ createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
+
+ if (keepaliveDelay > 0) {
+ LOG.warn("Adding keepalive facade, for device {}", nodeId);
+ salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
+ }
+
+ NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
+ new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+
+ NetconfDevice device = new NetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
+ processingExecutor.getExecutor(), reconnectOnChangedSchema);
+
+ return new NetconfConnectorDTO(new NetconfDeviceCommunicator(remoteDeviceId, device), salFacade);
+ }
+
+ public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDeviceCommunicator listener, NetconfNode node) {
+
+ //setup default values since default value is not supported yet in mdsal
+ // TODO remove this when mdsal starts supporting default values
+ final long clientConnectionTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
+ final long maxConnectionAttempts = node.getMaxConnectionAttempts() == null ? DEFAULT_MAX_CONNECTION_ATTEMPTS : node.getMaxConnectionAttempts();
+ final int betweenAttemptsTimeoutMillis = node.getBetweenAttemptsTimeoutMillis() == null ? DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS : node.getBetweenAttemptsTimeoutMillis();
+ final BigDecimal sleepFactor = node.getSleepFactor() == null ? DEFAULT_SLEEP_FACTOR : node.getSleepFactor();
+
+ final InetSocketAddress socketAddress = getSocketAddress(node.getHost(), node.getPort().getValue());
+
+ final ReconnectStrategyFactory sf = new TimedReconnectStrategyFactory(eventExecutor,
+ maxConnectionAttempts, betweenAttemptsTimeoutMillis, sleepFactor);
+ final ReconnectStrategy strategy = sf.createReconnectStrategy();
+
+ final AuthenticationHandler authHandler;
+ final Credentials credentials = node.getCredentials();
+ if (credentials instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) {
+ authHandler = new LoginPassword(
+ ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getUsername(),
+ ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getPassword());
+ } else {
+ throw new IllegalStateException("Only login/password authentification is supported");
+ }
+
+ return NetconfReconnectingClientConfigurationBuilder.create()
+ .withAddress(socketAddress)
+ .withConnectionTimeoutMillis(clientConnectionTimeoutMillis)
+ .withReconnectStrategy(strategy)
+ .withAuthHandler(authHandler)
+ .withProtocol(node.isTcpOnly() ?
+ NetconfClientConfiguration.NetconfClientProtocol.TCP :
+ NetconfClientConfiguration.NetconfClientProtocol.SSH)
+ .withConnectStrategyFactory(sf)
+ .withSessionListener(listener)
+ .build();
+ }
+
+ protected abstract RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis);
+
+ @Override
+ public abstract ConnectionStatusListenerRegistration registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener);
+
+ @Override
+ public void onSessionInitiated(ProviderSession session) {
+ mountPointService = session.getService(DOMMountPointService.class);
+ }
+
+ @Override
+ public Collection<ProviderFunctionality> getProviderFunctionality() {
+ return Collections.emptySet();
+ }
+
+ //TODO this needs to be an util method, since netconf clustering uses this aswell
+ /**
+ * Determines the Netconf Node Node ID, given the node's instance
+ * identifier.
+ *
+ * @param pathArgument Node's path arument
+ * @return NodeId for the node
+ */
+ protected NodeId getNodeId(final PathArgument pathArgument) {
+ if (pathArgument instanceof InstanceIdentifier.IdentifiableItem<?, ?>) {
+
+ final Identifier key = ((InstanceIdentifier.IdentifiableItem) pathArgument).getKey();
+ if(key instanceof NodeKey) {
+ return ((NodeKey) key).getNodeId();
+ }
+ }
+ throw new IllegalStateException("Unable to create NodeId from: " + pathArgument);
+ }
+
+ protected static InstanceIdentifier<Topology> createTopologyId(final String topologyId) {
+ final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
+ return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
+ }
+
+ private InetSocketAddress getSocketAddress(final Host host, int port) {
+ if(host.getDomainName() != null) {
+ return new InetSocketAddress(host.getDomainName().getValue(), port);
+ } else {
+ final IpAddress ipAddress = host.getIpAddress();
+ final String ip = ipAddress.getIpv4Address() != null ? ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue();
+ return new InetSocketAddress(ip, port);
+ }
+ }
+
+ private static final class TimedReconnectStrategyFactory implements ReconnectStrategyFactory {
+ private final Long connectionAttempts;
+ private final EventExecutor executor;
+ private final double sleepFactor;
+ private final int minSleep;
+
+ TimedReconnectStrategyFactory(final EventExecutor executor, final Long maxConnectionAttempts, final int minSleep, final BigDecimal sleepFactor) {
+ if (maxConnectionAttempts != null && maxConnectionAttempts > 0) {
+ connectionAttempts = maxConnectionAttempts;
+ } else {
+ connectionAttempts = null;
+ }
+
+ this.sleepFactor = sleepFactor.doubleValue();
+ this.executor = executor;
+ this.minSleep = minSleep;
+ }
+
+ @Override
+ public ReconnectStrategy createReconnectStrategy() {
+ final Long maxSleep = null;
+ final Long deadline = null;
+
+ return new TimedReconnectStrategy(executor, minSleep,
+ minSleep, sleepFactor, maxSleep, connectionAttempts, deadline);
+ }
+ }
+
+ protected static final class NetconfConnectorDTO {
+
+ private final NetconfDeviceCommunicator communicator;
+ private final RemoteDeviceHandler<NetconfSessionPreferences> facade;
+
+ private NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
+ this.communicator = communicator;
+ this.facade = facade;
+ }
+
+ public NetconfDeviceCommunicator getCommunicator() {
+ return communicator;
+ }
+
+ public RemoteDeviceHandler<NetconfSessionPreferences> getFacade() {
+ return facade;
+ }
+ }
+
+}
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
ListenableFuture<Void> disconnectNode(NodeId nodeId);
- void registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener);
+ void registerMountPoint(NodeId nodeId);
+
+ void unregisterMountPoint(NodeId nodeId);
+
+ ConnectionStatusListenerRegistration registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener);
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.impl;
+
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedActorExtension;
+import akka.actor.TypedProps;
+import akka.japi.Creator;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import io.netty.util.concurrent.EventExecutor;
+import java.util.Collection;
+import java.util.Collections;
+import javassist.ClassPool;
+import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
+import org.opendaylight.controller.config.threadpool.ThreadPool;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.AbstractNetconfTopology;
+import org.opendaylight.netconf.topology.NetconfTopology;
+import org.opendaylight.netconf.topology.NodeManagerCallback;
+import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
+import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
+import org.opendaylight.netconf.topology.TopologyManager;
+import org.opendaylight.netconf.topology.TopologyManagerCallback;
+import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
+import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
+import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade;
+import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
+import org.opendaylight.netconf.topology.util.BaseTopologyManager;
+import org.opendaylight.netconf.topology.util.NodeRoleChangeStrategy;
+import org.opendaylight.netconf.topology.util.NodeWriter;
+import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
+import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
+import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
+import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
+import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClusteredNetconfTopology extends AbstractNetconfTopology implements AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfTopology.class);
+
+ private final BindingNormalizedNodeCodecRegistry codecRegistry;
+
+ private final ActorSystem actorSystem;
+ private final EntityOwnershipService entityOwnershipService;
+ private TopologyManager topologyManager;
+
+ public ClusteredNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
+ final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
+ final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
+ final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider,
+ final ActorSystem actorSystem, final EntityOwnershipService entityOwnershipService) {
+ super(topologyId, clientDispatcher,
+ bindingAwareBroker, domBroker, eventExecutor,
+ keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
+
+ final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
+ moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
+ final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
+ Preconditions.checkState(schemaContextOptional.isPresent());
+ final SchemaContext topologySchemaCtx = schemaContextOptional.get();
+
+ final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
+ codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
+ codecRegistry.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
+
+ this.actorSystem = actorSystem;
+ this.entityOwnershipService = entityOwnershipService;
+ registerToSal(this, this);
+ LOG.warn("Clustered netconf topo started");
+ }
+
+ @Override
+ public void onSessionInitiated(final ProviderContext session) {
+ dataBroker = session.getSALService(DataBroker.class);
+ final NodeWriter writer = new TopologyNodeWriter(topologyId, dataBroker);
+ TypedActorExtension typedActorExtension = TypedActor.get(this.actorSystem);
+ LOG.warn("Registering actor on path {}", actorSystem.name() + "/user/" + topologyId);
+ topologyManager = typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
+ @Override
+ public BaseTopologyManager create() throws Exception {
+ return new BaseTopologyManager(actorSystem,
+ codecRegistry,
+ dataBroker,
+ topologyId,
+ new TopologyCallbackFactory(ClusteredNetconfTopology.this, entityOwnershipService, writer),
+ new NetconfNodeOperationalDataAggregator(),
+ new LoggingSalNodeWriter(writer),
+ new TopologyRoleChangeStrategy(dataBroker, entityOwnershipService, "topology-netconf", "topology-manager"));
+ }
+ }), topologyId);
+ }
+
+ @Override
+ public void close() throws Exception {
+ // close all existing connectors, delete whole topology in datastore?
+ for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
+ connectorDTO.getCommunicator().disconnect();
+ }
+ activeConnectors.clear();
+ }
+
+ @Override
+ protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
+ return new TopologyMountPointFacade(id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
+ }
+
+ @Override
+ public void registerMountPoint(NodeId nodeId) {
+ ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint();
+ }
+
+ @Override
+ public void unregisterMountPoint(NodeId nodeId) {
+ Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
+ ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
+ }
+
+ @Override
+ public ConnectionStatusListenerRegistration registerConnectionStatusListener(final NodeId node, final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
+ Preconditions.checkState(activeConnectors.containsKey(node), "Need to connect a node before a connection listener can be registered");
+ return ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
+ }
+
+ @Override
+ public Collection<ProviderFunctionality> getProviderFunctionality() {
+ return Collections.emptySet();
+ }
+
+ static class TopologyCallbackFactory implements TopologyManagerCallbackFactory {
+
+ private final NetconfTopology netconfTopology;
+ private final EntityOwnershipService entityOwnershipService;
+ private final NodeWriter writer;
+
+ TopologyCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService, final NodeWriter writer) {
+ this.netconfTopology = netconfTopology;
+ this.entityOwnershipService = entityOwnershipService;
+ this.writer = writer;
+ }
+
+ @Override
+ public TopologyManagerCallback create(final ActorSystem actorSystem, final String topologyId) {
+ return new NetconfTopologyManagerCallback(actorSystem, topologyId, new NodeCallbackFactory(netconfTopology, entityOwnershipService), new LoggingSalNodeWriter(writer));
+ }
+ }
+
+ private static class NodeCallbackFactory implements NodeManagerCallbackFactory {
+
+ private final NetconfTopology netconfTopology;
+ private final EntityOwnershipService entityOwnershipService;
+
+ NodeCallbackFactory(final NetconfTopology netconfTopology, final EntityOwnershipService entityOwnershipService) {
+ this.netconfTopology = netconfTopology;
+ this.entityOwnershipService = entityOwnershipService;
+ }
+
+ @Override
+ public NodeManagerCallback create(final String nodeId, final String topologyId, final ActorSystem actorSystem) {
+ return new NetconfNodeManagerCallback(nodeId, topologyId, actorSystem, netconfTopology, new NodeRoleChangeStrategy(entityOwnershipService, "netconf-node", nodeId));
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.impl;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedProps;
+import akka.cluster.Cluster;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.topology.NetconfTopology;
+import org.opendaylight.netconf.topology.NodeManagerCallback;
+import org.opendaylight.netconf.topology.RoleChangeStrategy;
+import org.opendaylight.netconf.topology.TopologyManager;
+import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
+import org.opendaylight.netconf.topology.util.BaseTopologyManager;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability.FailureReason;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapabilityBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDeviceHandler<NetconfSessionPreferences>{
+
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
+
+ public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
+ @Override
+ public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
+ return new UnavailableCapabilityBuilder()
+ .setCapability(input.getKey().toString())
+ .setFailureReason(input.getValue()).build();
+ }
+ };
+ public static final Function<QName, String> AVAILABLE_CAPABILITY_TRANSFORMER = new Function<QName, String>() {
+ @Override
+ public String apply(QName qName) {
+ // intern string representation of a capability to avoid duplicates
+ return qName.toString().intern();
+ }
+ };
+
+ private static final String UNKNOWN_REASON = "Unknown reason";
+
+ private boolean isMaster = false;
+ private ClusteredNetconfTopology topologyDispatcher;
+ private final ActorSystem actorSystem;
+ private final Cluster clusterExtension;
+
+ private final RoleChangeStrategy roleChangeStrategy;
+
+ private String nodeId;
+ private String topologyId;
+ private TopologyManager topologyManager;
+
+ private Node currentConfig;
+ private Node currentOperationalNode;
+
+ private ConnectionStatusListenerRegistration registration = null;
+
+ public NetconfNodeManagerCallback(final String nodeId,
+ final String topologyId,
+ final ActorSystem actorSystem,
+ final NetconfTopology topologyDispatcher,
+ final RoleChangeStrategy roleChangeStrategy) {
+ this.nodeId = nodeId;
+ this.topologyId = topologyId;
+ this.actorSystem = actorSystem;
+ this.clusterExtension = Cluster.get(actorSystem);
+ this.topologyDispatcher = (ClusteredNetconfTopology) topologyDispatcher;
+ this.roleChangeStrategy = roleChangeStrategy;
+
+ final Future<ActorRef> topologyRefFuture = actorSystem.actorSelection("/user/" + topologyId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
+ topologyRefFuture.onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
+ if (throwable != null) {
+ LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId, throwable);
+
+ }
+
+ LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
+ topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
+ }
+ }, actorSystem.dispatcher());
+ }
+
+
+ @Nonnull
+ @Override public Node getInitialState(@Nonnull final NodeId nodeId,
+ @Nonnull final Node configNode) {
+ final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
+
+ final Node initialNode = new NodeBuilder()
+ .setNodeId(nodeId)
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setHost(netconfNode.getHost())
+ .setPort(netconfNode.getPort())
+ .setConnectionStatus(ConnectionStatus.Connecting)
+ .setClusteredConnectionStatus(
+ new ClusteredConnectionStatusBuilder()
+ .setNodeStatus(
+ Lists.newArrayList(
+ new NodeStatusBuilder()
+ .setNode(clusterExtension.selfAddress().toString())
+ .setStatus(Status.Unavailable)
+ .build()))
+ .build())
+ .build())
+ .build();
+
+ if (currentOperationalNode == null) {
+ currentOperationalNode = initialNode;
+ }
+
+ return initialNode;
+ }
+
+ @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
+ @Nonnull final Node configNode) {
+ final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
+
+ return new NodeBuilder()
+ .setNodeId(nodeId)
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setHost(netconfNode.getHost())
+ .setPort(netconfNode.getPort())
+ .setConnectionStatus(ConnectionStatus.UnableToConnect)
+ .setClusteredConnectionStatus(
+ new ClusteredConnectionStatusBuilder()
+ .setNodeStatus(
+ Collections.singletonList(
+ new NodeStatusBuilder()
+ .setNode(clusterExtension.selfAddress().toString())
+ .setStatus(Status.Failed)
+ .build()))
+ .build())
+ .build())
+ .build();
+ }
+
+ @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
+ @Nonnull final Node configNode) {
+ this.nodeId = nodeId.getValue();
+ this.currentConfig = configNode;
+ // set initial state before anything happens
+ this.currentOperationalNode = getInitialState(nodeId, configNode);
+
+ // connect magic, send config into the netconf pipeline through topo dispatcher
+ final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
+
+ Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
+ @Override
+ public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
+ registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Connection to device failed", t);
+ }
+ });
+
+ final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
+
+ // transform future result into state that gets written into datastore
+ return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
+ @Nullable
+ @Override
+ public Node apply(NetconfDeviceCapabilities input) {
+ // build state data
+ currentOperationalNode = new NodeBuilder().setNodeId(nodeId)
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setConnectionStatus(ConnectionStatus.Connected)
+ .setClusteredConnectionStatus(
+ new ClusteredConnectionStatusBuilder()
+ .setNodeStatus(
+ Collections.singletonList(
+ new NodeStatusBuilder()
+ .setNode(clusterExtension.selfAddress().toString())
+ .setStatus(Status.Connected)
+ .build()))
+ .build())
+ .setHost(netconfNode.getHost())
+ .setPort(netconfNode.getPort())
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
+ .build()).build();
+ return currentOperationalNode;
+ }
+ });
+ }
+
+ @Nonnull
+ @Override
+ public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
+ @Nonnull final Node configNode) {
+ // first disconnect this node
+ topologyDispatcher.unregisterMountPoint(nodeId);
+ registration.close();
+ topologyDispatcher.disconnectNode(nodeId);
+
+ // now reinit this connection with new settings
+ final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
+
+ Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
+ @Override
+ public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
+ registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Connection to device failed", t);
+ }
+ });
+
+ final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
+
+ return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
+ @Nullable
+ @Override
+ public Node apply(NetconfDeviceCapabilities input) {
+ // build state data
+ return new NodeBuilder()
+ .setNodeId(nodeId)
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setConnectionStatus(ConnectionStatus.Connected)
+ .setClusteredConnectionStatus(
+ new ClusteredConnectionStatusBuilder()
+ .setNodeStatus(
+ Collections.singletonList(
+ new NodeStatusBuilder()
+ .setNode(clusterExtension.selfAddress().toString())
+ .setStatus(Status.Connected)
+ .build()))
+ .build())
+ .setHost(netconfNode.getHost())
+ .setPort(netconfNode.getPort())
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
+ .build())
+ .build();
+ }
+ });
+ }
+
+ @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
+ // cleanup and disconnect
+ topologyDispatcher.unregisterMountPoint(nodeId);
+ registration.close();
+ roleChangeStrategy.unregisterRoleCandidate();
+ return topologyDispatcher.disconnectNode(nodeId);
+ }
+
+ @Nonnull
+ @Override
+ public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
+ LOG.debug("Getting current status for node: {} status: {}", nodeId, currentOperationalNode);
+ return Futures.immediateFuture(currentOperationalNode);
+ }
+
+ @Override
+ public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
+ if (roleChangeDTO.isOwner() && roleChangeDTO.wasOwner()) {
+ return;
+ }
+ isMaster = roleChangeDTO.isOwner();
+ //TODO instead of registering mount point, init remote schema repo when its done
+ if (isMaster) {
+ // unregister old mountPoint if ownership changed, register a new one
+ topologyDispatcher.registerMountPoint(new NodeId(nodeId));
+ } else {
+ topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+ }
+ }
+
+ @Override
+ public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
+ // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
+ LOG.debug("onDeviceConnected received, registering role candidate");
+ roleChangeStrategy.registerRoleCandidate(this);
+ List<String> capabilityList = new ArrayList<>();
+ capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
+ capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
+ final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
+ avCapabalitiesBuilder.setAvailableCapability(capabilityList);
+
+ final UnavailableCapabilities unavailableCapabilities =
+ new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getUnresolvedCapabilites().entrySet())
+ .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
+
+ final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
+ currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setConnectionStatus(ConnectionStatus.Connected)
+ .setClusteredConnectionStatus(
+ new ClusteredConnectionStatusBuilder()
+ .setNodeStatus(
+ Collections.singletonList(
+ new NodeStatusBuilder()
+ .setNode(clusterExtension.selfAddress().toString())
+ .setStatus(Status.Connected)
+ .build()))
+ .build())
+ .setHost(netconfNode.getHost())
+ .setPort(netconfNode.getPort())
+ .setAvailableCapabilities(avCapabalitiesBuilder.build())
+ .setUnavailableCapabilities(unavailableCapabilities)
+ .build())
+ .build();
+ // TODO need to implement forwarding of this msg to master
+ topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
+ }
+
+ @Override
+ public void onDeviceDisconnected() {
+ // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
+ // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
+ LOG.debug("onDeviceDisconnected received, unregistering role candidate");
+ topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
+ roleChangeStrategy.unregisterRoleCandidate();
+ final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
+ currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setConnectionStatus(ConnectionStatus.Connecting)
+ .setClusteredConnectionStatus(
+ new ClusteredConnectionStatusBuilder()
+ .setNodeStatus(
+ Collections.singletonList(
+ new NodeStatusBuilder()
+ .setNode(clusterExtension.selfAddress().toString())
+ .setStatus(Status.Unavailable)
+ .build()))
+ .build())
+ .setHost(netconfNode.getHost())
+ .setPort(netconfNode.getPort())
+ .build()).build();
+ // TODO need to implement forwarding of this msg to master
+ topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
+ }
+
+ @Override
+ public void onDeviceFailed(Throwable throwable) {
+ // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
+ // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
+ LOG.debug("onDeviceFailed received");
+ String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
+
+ roleChangeStrategy.unregisterRoleCandidate();
+ currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setConnectionStatus(ConnectionStatus.UnableToConnect)
+ .setClusteredConnectionStatus(
+ new ClusteredConnectionStatusBuilder()
+ .setNodeStatus(
+ Collections.singletonList(
+ new NodeStatusBuilder()
+ .setNode(clusterExtension.selfAddress().toString())
+ .setStatus(Status.Failed)
+ .build()))
+ .build())
+ .setConnectedMessage(reason)
+ .build()).build();
+ topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
+ }
+
+
+ @Override
+ public void onNotification(DOMNotification domNotification) {
+ //NOOP
+ }
+
+ @Override
+ public void close() {
+ //NOOP
+ }
+
+ @Override
+ public void onReceive(Object o, ActorRef actorRef) {
+
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.impl;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.netconf.topology.StateAggregator;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetconfNodeOperationalDataAggregator implements StateAggregator{
+
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeOperationalDataAggregator.class);
+
+ @Override
+ public ListenableFuture<Node> combineCreateAttempts(final List<ListenableFuture<Node>> stateFutures) {
+ final SettableFuture<Node> future = SettableFuture.create();
+ final ListenableFuture<List<Node>> allAsList = Futures.allAsList(stateFutures);
+ Futures.addCallback(allAsList, new FutureCallback<List<Node>>() {
+ @Override
+ public void onSuccess(final List<Node> result) {
+ Node base = null;
+ NetconfNode baseAugmentation = null;
+ final ArrayList<NodeStatus> statusList = new ArrayList<>();
+ for (final Node node : result) {
+ final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+ if (base == null && netconfNode.getConnectionStatus().equals(ConnectionStatus.Connected)) {
+ base = node;
+ baseAugmentation = netconfNode;
+ }
+ LOG.debug(netconfNode.toString());
+ statusList.addAll(netconfNode.getClusteredConnectionStatus().getNodeStatus());
+ }
+
+ if (base == null) {
+ base = result.get(0);
+ baseAugmentation = result.get(0).getAugmentation(NetconfNode.class);
+ LOG.warn("All results {}", result.toString());
+ }
+
+ LOG.warn("Base node: {}", base);
+
+ final Node aggregatedNode =
+ new NodeBuilder(base)
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder(baseAugmentation)
+ .setClusteredConnectionStatus(
+ new ClusteredConnectionStatusBuilder()
+ .setNodeStatus(statusList)
+ .build())
+ .build())
+ .build();
+ future.set(aggregatedNode);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("One of the combined create attempts failed {}", t);
+ future.setException(t);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<Node> combineUpdateAttempts(final List<ListenableFuture<Node>> stateFutures) {
+ final SettableFuture<Node> future = SettableFuture.create();
+ final ListenableFuture<List<Node>> allAsList = Futures.allAsList(stateFutures);
+ Futures.addCallback(allAsList, new FutureCallback<List<Node>>() {
+ @Override
+ public void onSuccess(final List<Node> result) {
+ Node base = null;
+ NetconfNode baseAugmentation = null;
+ final ArrayList<NodeStatus> statusList = new ArrayList<>();
+ for (final Node node : result) {
+ final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+ if (base == null && netconfNode.getConnectionStatus().equals(ConnectionStatus.Connected)) {
+ base = node;
+ baseAugmentation = netconfNode;
+ }
+ LOG.debug(netconfNode.toString());
+ statusList.addAll(netconfNode.getClusteredConnectionStatus().getNodeStatus());
+ }
+
+ if (base == null) {
+ base = result.get(0);
+ baseAugmentation = result.get(0).getAugmentation(NetconfNode.class);
+ LOG.warn("All results {}", result.toString());
+ }
+
+ final Node aggregatedNode =
+ new NodeBuilder(base)
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder(baseAugmentation)
+ .setClusteredConnectionStatus(
+ new ClusteredConnectionStatusBuilder()
+ .setNodeStatus(statusList)
+ .build())
+ .build())
+ .build();
+ future.set(aggregatedNode);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("One of the combined update attempts failed {}", t);
+ future.setException(t);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<Void> combineDeleteAttempts(final List<ListenableFuture<Void>> stateFutures) {
+ final SettableFuture<Void> future = SettableFuture.create();
+ final ListenableFuture<List<Void>> allAsList = Futures.allAsList(stateFutures);
+ Futures.addCallback(allAsList, new FutureCallback<List<Void>>() {
+ @Override
+ public void onSuccess(final List<Void> result) {
+ future.set(null);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("One of the combined delete attempts failed {}", t);
+ future.setException(t);
+ }
+ });
+ return future;
+ }
+}
package org.opendaylight.netconf.topology.impl;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import io.netty.util.concurrent.EventExecutor;
-import java.io.File;
-import java.math.BigDecimal;
-import java.net.InetSocketAddress;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
import javax.annotation.Nonnull;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.controller.config.threadpool.ThreadPool;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Provider;
import org.opendaylight.netconf.client.NetconfClientDispatcher;
-import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
-import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
-import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
-import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
-import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
-import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
-import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
-import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
-import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalFacade;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.NetconfTopology;
+import org.opendaylight.netconf.topology.AbstractNetconfTopology;
import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
-import org.opendaylight.netconf.topology.TopologyMountPointFacade;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
-import org.opendaylight.protocol.framework.TimedReconnectStrategy;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.Identifier;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
-import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
-import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
-import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
-import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
-import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
-import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
-import org.opendaylight.yangtools.yang.parser.util.TextToASTTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NetconfTopologyImpl implements NetconfTopology, DataTreeChangeListener<Node>, BindingAwareProvider, Provider, AutoCloseable {
+public class NetconfTopologyImpl extends AbstractNetconfTopology implements DataTreeChangeListener<Node>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyImpl.class);
- private static final long DEFAULT_REQUEST_TIMEOUT_MILIS = 60000L;
- private static final int DEFAULT_KEEPALIVE_DELAY = 0;
- private static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
- private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
- private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
- private static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5);
+ private ListenerRegistration<NetconfTopologyImpl> datastoreListenerRegistration = null;
- private static FilesystemSchemaSourceCache<YangTextSchemaSource> CACHE = null;
- //keep track of already initialized repositories to avoid adding redundant listeners
- private static final Set<SchemaRepository> INITIALIZED_SCHEMA_REPOSITORIES = new HashSet<>();
-
- private final String topologyId;
- private final boolean listenForConfigChanges;
- private final NetconfClientDispatcher clientDispatcher;
- private final BindingAwareBroker bindingAwareBroker;
- private final Broker domBroker;
- private final EventExecutor eventExecutor;
- private final ScheduledThreadPool keepaliveExecutor;
- private final ThreadPool processingExecutor;
- private final SharedSchemaRepository sharedSchemaRepository;
-
- private SchemaSourceRegistry schemaRegistry = null;
- private SchemaContextFactory schemaContextFactory = null;
-
- private DOMMountPointService mountPointService = null;
- private DataBroker dataBroker = null;
- private final HashMap<NodeId, NetconfConnectorDTO> activeConnectors = new HashMap<>();
-
- private ListenerRegistration<NetconfTopologyImpl> listenerRegistration = null;
-
- public NetconfTopologyImpl(final String topologyId, final boolean listenForConfigChanges, final NetconfClientDispatcher clientDispatcher,
+ public NetconfTopologyImpl(final String topologyId, final NetconfClientDispatcher clientDispatcher,
final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider) {
- this.topologyId = topologyId;
- this.listenForConfigChanges = listenForConfigChanges;
- this.clientDispatcher = clientDispatcher;
- this.bindingAwareBroker = bindingAwareBroker;
- this.domBroker = domBroker;
- this.eventExecutor = eventExecutor;
- this.keepaliveExecutor = keepaliveExecutor;
- this.processingExecutor = processingExecutor;
- this.sharedSchemaRepository = schemaRepositoryProvider.getSharedSchemaRepository();
-
- initFilesystemSchemaSourceCache(sharedSchemaRepository);
-
+ super(topologyId, clientDispatcher,
+ bindingAwareBroker, domBroker, eventExecutor,
+ keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
registerToSal(this, this);
}
- private void registerToSal(BindingAwareProvider baProvider, Provider provider) {
- domBroker.registerProvider(provider);
- bindingAwareBroker.registerProvider(baProvider);
- }
-
@Override
public void close() throws Exception {
// close all existing connectors, delete whole topology in datastore?
for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
- connectorDTO.getCommunicator().disconnect();
+ connectorDTO.getCommunicator().close();
}
activeConnectors.clear();
- if (listenerRegistration != null) {
- listenerRegistration.close();
- listenerRegistration = null;
+ if (datastoreListenerRegistration != null) {
+ datastoreListenerRegistration.close();
+ datastoreListenerRegistration = null;
}
}
@Override
- public String getTopologyId() {
- return topologyId;
+ protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(RemoteDeviceId id, Broker domBroker, BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
+ return new NetconfDeviceSalFacade(id, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
}
@Override
- public DataBroker getDataBroker() {
- return Preconditions.checkNotNull(dataBroker, "DataBroker not initialized yet");
+ public void registerMountPoint(NodeId nodeId) {
+ throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline");
}
@Override
- public ListenableFuture<NetconfDeviceCapabilities> connectNode(NodeId nodeId, Node configNode) {
- LOG.info("Connecting RemoteDevice{{}} , with config {}", nodeId, configNode);
- return setupConnection(nodeId, configNode);
+ public void unregisterMountPoint(NodeId nodeId) {
+ throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline");
}
@Override
- public ListenableFuture<Void> disconnectNode(NodeId nodeId) {
- LOG.debug("Disconnecting RemoteDevice{{}}", nodeId.getValue());
- if (!activeConnectors.containsKey(nodeId)) {
- return Futures.immediateFailedFuture(new IllegalStateException("Unable to disconnect device that is not connected"));
- }
-
- // retrieve connection, and disconnect it
- final NetconfConnectorDTO connectorDTO = activeConnectors.remove(nodeId);
- connectorDTO.getCommunicator().close();
- connectorDTO.getFacade().close();
- return Futures.immediateFuture(null);
- }
-
- @Override
- public void registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener) {
- if (activeConnectors.get(node).getFacade() instanceof TopologyMountPointFacade) {
- ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
- } else {
- LOG.warn("Unable to register a connection status listener on a regular salFacade, reconfigure for topology mountpoint facade");
- }
- }
-
- private ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
- final Node configNode) {
- final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
-
- Preconditions.checkNotNull(netconfNode.getHost());
- Preconditions.checkNotNull(netconfNode.getPort());
- Preconditions.checkNotNull(netconfNode.isTcpOnly());
-
- final NetconfConnectorDTO deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode);
- final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
- final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(deviceCommunicator, netconfNode);
- final ListenableFuture<NetconfDeviceCapabilities> future = deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig);
-
- Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
- @Override
- public void onSuccess(NetconfDeviceCapabilities result) {
- LOG.debug("Connector for : " + nodeId.getValue() + " started succesfully");
- activeConnectors.put(nodeId, deviceCommunicatorDTO);
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOG.error("Connector for : " + nodeId.getValue() + " failed");
- // remove this node from active connectors?
- }
- });
-
- return future;
- }
-
- private NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
- final NetconfNode node) {
- //setup default values since default value is not supported yet in mdsal
- // TODO remove this when mdsal starts supporting default values
- final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
- final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
- final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
-
- IpAddress ipAddress = node.getHost().getIpAddress();
- InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
- ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
- node.getPort().getValue());
- RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
-
- // we might need to create a new SalFacade to maintain backwards compatibility with special case loopback connection
-// TopologyMountPointFacade mountPointFacade =
-// new TopologyMountPointFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
- RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
- new NetconfDeviceSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
-// new TopologyMountPointFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
-
- if (keepaliveDelay > 0) {
- LOG.warn("Adding keepalive facade, for device {}", nodeId);
- salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
- }
-
- NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
- new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
-
- NetconfDevice device = new NetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
- processingExecutor.getExecutor(), reconnectOnChangedSchema);
-
- return new NetconfConnectorDTO(new NetconfDeviceCommunicator(remoteDeviceId, device), salFacade);
- }
-
- public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDeviceCommunicator listener, NetconfNode node) {
-
- //setup default values since default value is not supported yet in mdsal
- // TODO remove this when mdsal starts supporting default values
- final long clientConnectionTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
- final long maxConnectionAttempts = node.getMaxConnectionAttempts() == null ? DEFAULT_MAX_CONNECTION_ATTEMPTS : node.getMaxConnectionAttempts();
- final int betweenAttemptsTimeoutMillis = node.getBetweenAttemptsTimeoutMillis() == null ? DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS : node.getBetweenAttemptsTimeoutMillis();
- final BigDecimal sleepFactor = node.getSleepFactor() == null ? DEFAULT_SLEEP_FACTOR : node.getSleepFactor();
-
- final InetSocketAddress socketAddress = getSocketAddress(node.getHost(), node.getPort().getValue());
-
- final ReconnectStrategyFactory sf = new TimedReconnectStrategyFactory(eventExecutor,
- maxConnectionAttempts, betweenAttemptsTimeoutMillis, sleepFactor);
- final ReconnectStrategy strategy = sf.createReconnectStrategy();
-
- final AuthenticationHandler authHandler;
- final Credentials credentials = node.getCredentials();
- if (credentials instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) {
- authHandler = new LoginPassword(
- ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getUsername(),
- ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getPassword());
- } else {
- throw new IllegalStateException("Only login/password authentification is supported");
- }
-
- return NetconfReconnectingClientConfigurationBuilder.create()
- .withAddress(socketAddress)
- .withConnectionTimeoutMillis(clientConnectionTimeoutMillis)
- .withReconnectStrategy(strategy)
- .withAuthHandler(authHandler)
- .withProtocol(node.isTcpOnly() ?
- NetconfClientConfiguration.NetconfClientProtocol.TCP :
- NetconfClientConfiguration.NetconfClientProtocol.SSH)
- .withConnectStrategyFactory(sf)
- .withSessionListener(listener)
- .build();
- }
-
- @Override
- public void onSessionInitiated(ProviderSession session) {
- mountPointService = session.getService(DOMMountPointService.class);
- }
-
- @Override
- public Collection<ProviderFunctionality> getProviderFunctionality() {
- return Collections.emptySet();
+ public ConnectionStatusListenerRegistration registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener) {
+ throw new UnsupportedOperationException("Registering a listener on a regular netconf device is not supported(supported only in clustered netconf topology)");
}
@Override
public void onSessionInitiated(ProviderContext session) {
dataBroker = session.getSALService(DataBroker.class);
- if (listenForConfigChanges) {
- LOG.warn("Registering datastore listener");
- listenerRegistration =
- dataBroker.registerDataTreeChangeListener(
- new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, createTopologyId(topologyId).child(Node.class)), this);
- }
+ LOG.warn("Registering datastore listener");
+ datastoreListenerRegistration =
+ dataBroker.registerDataTreeChangeListener(
+ new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, createTopologyId(topologyId).child(Node.class)), this);
}
@Override
case WRITE:
LOG.debug("Config for node {} created", getNodeId(rootNode.getIdentifier()));
if (activeConnectors.containsKey(getNodeId(rootNode.getIdentifier()))) {
- LOG.warn("RemoteDevice{{}} was already configured, reconfiguring..");
+ LOG.warn("RemoteDevice{{}} was already configured, reconfiguring..", getNodeId(rootNode.getIdentifier()));
disconnectNode(getNodeId(rootNode.getIdentifier()));
}
connectNode(getNodeId(rootNode.getIdentifier()), rootNode.getDataAfter());
}
}
- private void initFilesystemSchemaSourceCache(SharedSchemaRepository repository) {
- LOG.warn("Schema repository used: {}", repository.getIdentifier());
- if (CACHE == null) {
- CACHE = new FilesystemSchemaSourceCache<>(repository, YangTextSchemaSource.class, new File("cache/schema"));
- }
- if (!INITIALIZED_SCHEMA_REPOSITORIES.contains(repository)) {
- repository.registerSchemaSourceListener(CACHE);
- repository.registerSchemaSourceListener(TextToASTTransformer.create(repository, repository));
- INITIALIZED_SCHEMA_REPOSITORIES.add(repository);
- }
- setSchemaRegistry(repository);
- setSchemaContextFactory(repository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT));
- }
-
- public void setSchemaRegistry(final SchemaSourceRegistry schemaRegistry) {
- this.schemaRegistry = schemaRegistry;
- }
-
- public void setSchemaContextFactory(final SchemaContextFactory schemaContextFactory) {
- this.schemaContextFactory = schemaContextFactory;
- }
-
- //TODO this needs to be an util method, since netconf clustering uses this aswell
- /**
- * Determines the Netconf Node Node ID, given the node's instance
- * identifier.
- *
- * @param pathArgument Node's path arument
- * @return NodeId for the node
- */
- private NodeId getNodeId(final PathArgument pathArgument) {
- if (pathArgument instanceof InstanceIdentifier.IdentifiableItem<?, ?>) {
-
- final Identifier key = ((InstanceIdentifier.IdentifiableItem) pathArgument).getKey();
- if(key instanceof NodeKey) {
- return ((NodeKey) key).getNodeId();
- }
- }
- throw new IllegalStateException("Unable to create NodeId from: " + pathArgument);
- }
-
- private static InstanceIdentifier<Topology> createTopologyId(final String topologyId) {
- final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
- return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
- }
-
- private static final class NetconfConnectorDTO {
-
- private final NetconfDeviceCommunicator communicator;
- private final RemoteDeviceHandler<NetconfSessionPreferences> facade;
-
- private NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
- this.communicator = communicator;
- this.facade = facade;
- }
-
- public NetconfDeviceCommunicator getCommunicator() {
- return communicator;
- }
-
- public RemoteDeviceHandler<NetconfSessionPreferences> getFacade() {
- return facade;
- }
- }
-
- private static final class TimedReconnectStrategyFactory implements ReconnectStrategyFactory {
- private final Long connectionAttempts;
- private final EventExecutor executor;
- private final double sleepFactor;
- private final int minSleep;
-
- TimedReconnectStrategyFactory(final EventExecutor executor, final Long maxConnectionAttempts, final int minSleep, final BigDecimal sleepFactor) {
- if (maxConnectionAttempts != null && maxConnectionAttempts > 0) {
- connectionAttempts = maxConnectionAttempts;
- } else {
- connectionAttempts = null;
- }
-
- this.sleepFactor = sleepFactor.doubleValue();
- this.executor = executor;
- this.minSleep = minSleep;
- }
-
- @Override
- public ReconnectStrategy createReconnectStrategy() {
- final Long maxSleep = null;
- final Long deadline = null;
-
- return new TimedReconnectStrategy(executor, minSleep,
- minSleep, sleepFactor, maxSleep, connectionAttempts, deadline);
- }
- }
-
- private InetSocketAddress getSocketAddress(final Host host, int port) {
- if(host.getDomainName() != null) {
- return new InetSocketAddress(host.getDomainName().getValue(), port);
- } else {
- final IpAddress ipAddress = host.getIpAddress();
- final String ip = ipAddress.getIpv4Address() != null ? ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue();
- return new InetSocketAddress(ip, port);
- }
- }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.impl;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.netconf.topology.NodeManager;
+import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
+import org.opendaylight.netconf.topology.TopologyManagerCallback;
+import org.opendaylight.netconf.topology.util.BaseNodeManager.BaseNodeManagerBuilder;
+import org.opendaylight.netconf.topology.util.NodeWriter;
+import org.opendaylight.netconf.topology.util.NoopRoleChangeStrategy;
+import org.opendaylight.netconf.topology.util.SalNodeWriter;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetconfTopologyManagerCallback implements TopologyManagerCallback {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManagerCallback.class);
+
+ private final ActorSystem actorSystem;
+ private boolean isMaster;
+
+ private final String topologyId;
+ private final NodeWriter naSalNodeWriter;
+ private final Map<NodeId, NodeManager> nodes = new HashMap<>();
+ private final NodeManagerCallbackFactory nodeHandlerFactory;
+
+ public NetconfTopologyManagerCallback(final ActorSystem actorSystem,
+ final DataBroker dataBroker,
+ final String topologyId,
+ final NodeManagerCallbackFactory nodeHandlerFactory) {
+ this(actorSystem, topologyId, nodeHandlerFactory, new SalNodeWriter(dataBroker, topologyId));
+ }
+
+ public NetconfTopologyManagerCallback(final ActorSystem actorSystem,
+ final String topologyId,
+ final NodeManagerCallbackFactory nodeHandlerFactory,
+ final NodeWriter naSalNodeWriter) {
+ this(actorSystem, topologyId, nodeHandlerFactory, naSalNodeWriter, false);
+
+ }
+
+ public NetconfTopologyManagerCallback(final ActorSystem actorSystem,
+ final String topologyId,
+ final NodeManagerCallbackFactory nodeHandlerFactory,
+ final NodeWriter naSalNodeWriter,
+ boolean isMaster) {
+ this.actorSystem = actorSystem;
+ this.topologyId = topologyId;
+ this.nodeHandlerFactory = nodeHandlerFactory;
+ this.naSalNodeWriter = naSalNodeWriter;
+
+ this.isMaster = isMaster;
+ }
+
+ @Override
+ public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
+
+ // if this node was already configured, and whole config was pushed again, reinit with update
+ if (nodes.containsKey(nodeId)) {
+ return onNodeUpdated(nodeId, node);
+ }
+
+ // Init node admin
+ final NodeManager naBaseNodeManager =
+ createNodeManager(nodeId);
+ nodes.put(nodeId, naBaseNodeManager);
+
+ // put initial state into datastore
+ naSalNodeWriter.init(nodeId, naBaseNodeManager.getInitialState(nodeId, node));
+
+ // trigger connect on this node
+ return naBaseNodeManager.onNodeCreated(nodeId, node);
+ }
+
+ @Override
+ public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
+ // put initial state into datastore
+ naSalNodeWriter.init(nodeId, nodes.get(nodeId).getInitialState(nodeId, node));
+
+ // Trigger onNodeUpdated only on this node
+ return nodes.get(nodeId).onNodeUpdated(nodeId, node);
+ }
+
+ @Override
+ public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
+ // Trigger delete only on this node
+ final ListenableFuture<Void> future = nodes.get(nodeId).onNodeDeleted(nodeId);
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ // remove proxy from node list and stop the actor
+ LOG.debug("Stopping node actor for node : {}", nodeId.getValue());
+ final NodeManager remove = nodes.remove(nodeId);
+ TypedActor.get(actorSystem).stop(remove);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // NOOP will be handled on higher level
+ }
+ });
+ return future;
+ }
+
+ @Nonnull
+ @Override
+ public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
+ return nodes.get(nodeId).getCurrentStatusForNode(nodeId);
+ }
+
+ @Override
+ public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+ isMaster = roleChangeDTO.isOwner();
+ // our post-election logic
+ }
+
+ private NodeManager createNodeManager(NodeId nodeId) {
+ return new BaseNodeManagerBuilder().setNodeId(nodeId.getValue())
+ .setActorContext(TypedActor.context())
+ .setDelegateFactory(nodeHandlerFactory)
+ .setRoleChangeStrategy(new NoopRoleChangeStrategy())
+ .setTopologyId(topologyId)
+ .build();
+ }
+
+ @Override
+ public void onReceive(Object o, ActorRef actorRef) {
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.impl;
+
+import akka.actor.TypedActor;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.List;
+import org.opendaylight.netconf.topology.StateAggregator;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OnlySuccessStateAggregator implements StateAggregator{
+
+ private static final Logger LOG = LoggerFactory.getLogger(OnlySuccessStateAggregator.class);
+
+ @Override
+ public ListenableFuture<Node> combineCreateAttempts(List<ListenableFuture<Node>> stateFutures) {
+ final SettableFuture<Node> future = SettableFuture.create();
+ final ListenableFuture<List<Node>> allAsList = Futures.allAsList(stateFutures);
+ Futures.addCallback(allAsList, new FutureCallback<List<Node>>() {
+ @Override
+ public void onSuccess(List<Node> result) {
+ for (int i = 0; i < result.size() - 1; i++) {
+ if (!result.get(i).equals(result.get(i + 1))) {
+ future.setException(new IllegalStateException("Create futures have different result"));
+ }
+ }
+ future.set(result.get(0));
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("One of the combined create attempts failed {}", t);
+ future.setException(t);
+ }
+ }, TypedActor.context().dispatcher());
+
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<Node> combineUpdateAttempts(List<ListenableFuture<Node>> stateFutures) {
+ final SettableFuture<Node> future = SettableFuture.create();
+ final ListenableFuture<List<Node>> allAsList = Futures.allAsList(stateFutures);
+ Futures.addCallback(allAsList, new FutureCallback<List<Node>>() {
+ @Override
+ public void onSuccess(List<Node> result) {
+ for (int i = 0; i < result.size() - 1; i++) {
+ if (!result.get(i).equals(result.get(i + 1))) {
+ future.setException(new IllegalStateException("Update futures have different result"));
+ }
+ }
+ future.set(result.get(0));
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("One of the combined update attempts failed {}", t);
+ future.setException(t);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<Void> combineDeleteAttempts(List<ListenableFuture<Void>> stateFutures) {
+ final SettableFuture<Void> future = SettableFuture.create();
+ final ListenableFuture<List<Void>> allAsList = Futures.allAsList(stateFutures);
+ Futures.addCallback(allAsList, new FutureCallback<List<Void>>() {
+ @Override
+ public void onSuccess(List<Void> result) {
+ future.set(null);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("One of the combined delete attempts failed {}", t);
+ future.setException(t);
+ }
+ });
+ return future;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.netconf.topology.util.NodeWriter;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopologyNodeWriter implements NodeWriter{
+
+ private static final Logger LOG = LoggerFactory.getLogger(TopologyNodeWriter.class);
+
+ private final String topologyId;
+ private final BindingTransactionChain txChain;
+
+ private final InstanceIdentifier<NetworkTopology> networkTopologyPath;
+ private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
+
+ public TopologyNodeWriter(final String topologyId, final DataBroker dataBroker) {
+ this.topologyId = topologyId;
+ this.txChain = Preconditions.checkNotNull(dataBroker).createTransactionChain(new TransactionChainListener() {
+ @Override
+ public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction, Throwable cause) {
+ LOG.error("{}: TransactionChain({}) {} FAILED!", chain,
+ transaction.getIdentifier(), cause);
+ throw new IllegalStateException("Clustered topology writer TransactionChain(" + chain + ") not committed correctly", cause);
+ }
+
+ @Override
+ public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
+ LOG.trace("Clustered topology writer TransactionChain({}) SUCCESSFUL", chain);
+ }
+ });
+
+ this.networkTopologyPath = InstanceIdentifier.builder(NetworkTopology.class).build();
+ this.topologyListPath = networkTopologyPath.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
+ }
+
+ @Override
+ public void init(@Nonnull NodeId id, @Nonnull Node operationalDataNode) {
+ final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+
+ createNetworkTopologyIfNotPresent(writeTx);
+ final InstanceIdentifier<Node> path = createBindingPathForTopology(new NodeKey(id), topologyId);
+
+ LOG.trace("{}: Init device state transaction {} putting if absent operational data started. Putting data on path {}",
+ id.getValue(), writeTx.getIdentifier(), path);
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, path, operationalDataNode);
+ LOG.trace("{}: Init device state transaction {} putting operational data ended.",
+ id.getValue(), writeTx.getIdentifier());
+
+ commitTransaction(writeTx, "init", id);
+ }
+
+ @Override
+ public void update(@Nonnull NodeId id, @Nonnull Node operationalDataNode) {
+ final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+
+ final InstanceIdentifier<Node> path = createBindingPathForTopology(new NodeKey(id), topologyId);
+ LOG.trace("{}: Update device state transaction {} merging operational data started. Putting data on path {}",
+ id, writeTx.getIdentifier(), operationalDataNode);
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, path, operationalDataNode);
+ LOG.trace("{}: Update device state transaction {} merging operational data ended.",
+ id, writeTx.getIdentifier());
+
+ commitTransaction(writeTx, "update", id);
+ }
+
+ @Override
+ public void delete(@Nonnull NodeId id) {
+ final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+
+ final InstanceIdentifier<Node> path = createBindingPathForTopology(new NodeKey(id), topologyId);
+
+ LOG.trace(
+ "{}: Close device state transaction {} removing all data started. Path: {}",
+ id, writeTx.getIdentifier(), path);
+ writeTx.delete(LogicalDatastoreType.OPERATIONAL, path);
+ LOG.trace(
+ "{}: Close device state transaction {} removing all data ended.",
+ id, writeTx.getIdentifier());
+
+ commitTransaction(writeTx, "close", id);
+ }
+
+ private void commitTransaction(final WriteTransaction transaction, final String txType, final NodeId id) {
+ LOG.trace("{}: Committing Transaction {}:{}", id.getValue(), txType,
+ transaction.getIdentifier());
+ final CheckedFuture<Void, TransactionCommitFailedException> result = transaction.submit();
+
+ Futures.addCallback(result, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.trace("{}: Transaction({}) {} SUCCESSFUL", id.getValue(), txType,
+ transaction.getIdentifier());
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("{}: Transaction({}) {} FAILED!", id.getValue(), txType,
+ transaction.getIdentifier(), t);
+ throw new IllegalStateException(id.getValue() + " Transaction(" + txType + ") not committed correctly", t);
+ }
+ });
+
+ }
+
+ private void createNetworkTopologyIfNotPresent(final WriteTransaction writeTx) {
+
+ final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
+ LOG.trace("{}: Merging {} container to ensure its presence", topologyId,
+ NetworkTopology.QNAME, writeTx.getIdentifier());
+ writeTx.merge(LogicalDatastoreType.OPERATIONAL, networkTopologyPath, networkTopology);
+
+ final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
+ LOG.trace("{}: Merging {} container to ensure its presence", topologyId,
+ Topology.QNAME, writeTx.getIdentifier());
+ writeTx.merge(LogicalDatastoreType.OPERATIONAL, topologyListPath, topology);
+ }
+
+ private static InstanceIdentifier<Node> createBindingPathForTopology(final NodeKey key, final String topologyId) {
+ final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.builder(NetworkTopology.class).build();
+ final KeyedInstanceIdentifier<Topology, TopologyKey> topology = networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
+ return topology
+ .child(Node.class, new NodeKey(new NodeId(key.getNodeId().getValue())));
+ }
+
+ private static org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier createBIPathForTopology(final String name, final String topologyId) {
+ final YangInstanceIdentifier.InstanceIdentifierBuilder builder = YangInstanceIdentifier.builder();
+ builder
+ .node(NetworkTopology.QNAME)
+ .node(Topology.QNAME)
+ .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"), topologyId)
+ .node(Node.QNAME)
+ .nodeWithKey(Node.QNAME, QName.create(Node.QNAME, "node-id"), name);
+ return builder.build();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Collections;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalProvider.MountInstance;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClusteredNetconfDeviceMountInstanceProxy implements Provider, AutoCloseable{
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfDeviceMountInstanceProxy.class);
+
+ private final RemoteDeviceId id;
+ private MountInstance mountInstance;
+
+ public ClusteredNetconfDeviceMountInstanceProxy(final RemoteDeviceId deviceId) {
+ this.id = deviceId;
+ }
+
+ public MountInstance getMountInstance() {
+ Preconditions.checkState(mountInstance != null,
+ "%s: Mount instance was not initialized by sal. Cannot get mount instance", id);
+ return mountInstance;
+ }
+
+ @Override
+ public void close() throws Exception {
+ mountInstance.close();
+ }
+
+ @Override
+ public void onSessionInitiated(final Broker.ProviderSession session) {
+ LOG.debug("{}: (BI)Session with sal established {}", id, session);
+
+ final DOMMountPointService mountService = session.getService(DOMMountPointService.class);
+ if (mountService != null) {
+ mountInstance = new MountInstance(mountService, id);
+ }
+ }
+
+ @Override
+ public Collection<ProviderFunctionality> getProviderFunctionality() {
+ return Collections.emptySet();
+ }
+}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.netconf.topology;
+package org.opendaylight.netconf.topology.pipeline;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
-import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalProvider;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
private SchemaContext remoteSchemaContext = null;
private NetconfSessionPreferences netconfSessionPreferences = null;
private DOMRpcService deviceRpc = null;
- private final NetconfDeviceSalProvider salProvider;
+ private final ClusteredNetconfDeviceMountInstanceProxy salProvider;
private final ArrayList<RemoteDeviceHandler<NetconfSessionPreferences>> connectionStatusListeners = new ArrayList<>();
this.domBroker = domBroker;
this.bindingBroker = bindingBroker;
this.defaultRequestTimeoutMillis = defaultRequestTimeoutMillis;
- this.salProvider = new NetconfDeviceSalProvider(id);
- registerToSal(domBroker, bindingBroker);
+ this.salProvider = new ClusteredNetconfDeviceMountInstanceProxy(id);
+ registerToSal(domBroker);
}
- public void registerToSal(final Broker domRegistryDependency, final BindingAwareBroker bindingBroker) {
+ public void registerToSal(final Broker domRegistryDependency) {
domRegistryDependency.registerProvider(salProvider);
- bindingBroker.registerProvider(salProvider);
}
@Override
@Override
public void onDeviceDisconnected() {
- salProvider.getMountInstance().onTopologyDeviceDisconnected();
+ // do not unregister mount point here, this gets handle by the underlying call from role change callback
for (RemoteDeviceHandler<NetconfSessionPreferences> listener : connectionStatusListeners) {
listener.onDeviceDisconnected();
}
@Override
public void onDeviceFailed(Throwable throwable) {
- salProvider.getMountInstance().onTopologyDeviceDisconnected();
+ // do not unregister mount point here, this gets handle by the underlying call from role change callback
for (RemoteDeviceHandler<NetconfSessionPreferences> listener : connectionStatusListeners) {
listener.onDeviceFailed(throwable);
}
public void registerMountPoint() {
Preconditions.checkNotNull(id);
- Preconditions.checkNotNull(remoteSchemaContext);
- Preconditions.checkNotNull(netconfSessionPreferences);
+ Preconditions.checkNotNull(remoteSchemaContext, "Device has no remote schema context yet. Probably not fully connected.");
+ Preconditions.checkNotNull(netconfSessionPreferences, "Device has no capabilities yet. Probably not fully connected.");
final DOMDataBroker netconfDeviceDataBroker = new NetconfDeviceDataBroker(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis);
final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
salProvider.getMountInstance().onTopologyDeviceDisconnected();
}
- public void registerConnectionStatusListener(final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
+ public ConnectionStatusListenerRegistration registerConnectionStatusListener(final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
connectionStatusListeners.add(listener);
+ return new ConnectionStatusListenerRegistration(listener);
}
@Override
}
}
}
+
+ public class ConnectionStatusListenerRegistration{
+
+ private final RemoteDeviceHandler<NetconfSessionPreferences> listener;
+
+ public ConnectionStatusListenerRegistration(final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
+ this.listener = listener;
+ }
+
+ public void close() {
+ connectionStatusListeners.remove(listener);
+ }
+ }
}
--- /dev/null
+module clustered-netconf-topology {
+
+ yang-version 1;
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:clustered:netconf:topology";
+ prefix "nt";
+
+ import config { prefix config; revision-date 2013-04-05; }
+ import threadpool {prefix th;}
+ import netty {prefix netty;}
+ import opendaylight-md-sal-dom {prefix dom;}
+ import opendaylight-md-sal-binding {prefix md-sal-binding; revision-date 2013-10-28;}
+ import odl-netconf-cfg { prefix cfg-net; revision-date 2014-04-08; }
+ import shared-schema-repository { prefix sh; revision-date 2015-07-27; }
+ import netconf-topology { prefix topo; revision-date 2015-07-27; }
+ import opendaylight-entity-ownership-service { prefix eos; revision-date 2015-08-10; }
+ import actor-system-provider-service { prefix asp; revision-date 2015-10-05; }
+
+ description
+ "Module definition for Netconf topolgy. Netconf topology provides a set of common configuration ";
+
+ revision "2015-11-04" {
+ description
+ "Initial revision";
+ }
+
+ identity clustered-netconf-topology-impl {
+ base config:module-type;
+ config:java-name-prefix ClusteredNetconfTopology;
+ config:provided-service topo:netconf-topology;
+ }
+
+ augment "/config:modules/config:module/config:configuration" {
+ case clustered-netconf-topology-impl {
+ when "/config:modules/config:module/config:type = 'clustered-netconf-topology-impl'";
+
+ leaf topology-id {
+ mandatory true;
+ type string;
+ }
+
+ container dom-registry {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity dom:dom-broker-osgi-registry;
+ }
+ }
+ }
+
+ container binding-registry {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity md-sal-binding:binding-broker-osgi-registry;
+ }
+ }
+ }
+
+ container event-executor {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity netty:netty-event-executor;
+ }
+ }
+ }
+
+ container processing-executor {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity th:threadpool;
+ }
+ }
+
+ description "Makes up for flaws in netty threading design";
+ }
+
+ container client-dispatcher {
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity cfg-net:netconf-client-dispatcher;
+ }
+ }
+ }
+
+ container keepalive-executor {
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity th:scheduled-threadpool;
+ }
+ }
+
+ description "Dedicated solely to keepalive execution";
+ }
+
+ container shared-schema-repository {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity sh:shared-schema-repository;
+ }
+ }
+ }
+
+ container entity-ownership-service {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity eos:entity-ownership-service;
+ }
+ }
+ }
+
+ container actor-system-provider-service {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity asp:actor-system-provider-service;
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
type string;
}
- leaf listen-for-config-changes {
- mandatory true;
- type boolean;
- }
-
container dom-registry {
uses config:service-ref {
refine type {
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology;
+
+import static com.jayway.awaitility.Awaitility.await;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedActorExtension;
+import akka.actor.TypedProps;
+import akka.japi.Creator;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.typesafe.config.ConfigFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javassist.ClassPool;
+import javax.annotation.Nonnull;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
+import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
+import org.opendaylight.netconf.topology.example.ExampleNodeManagerCallback;
+import org.opendaylight.netconf.topology.example.ExampleTopologyManagerCallback;
+import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
+import org.opendaylight.netconf.topology.impl.NetconfNodeOperationalDataAggregator;
+import org.opendaylight.netconf.topology.util.BaseTopologyManager;
+import org.opendaylight.netconf.topology.util.NoopRoleChangeStrategy;
+import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.PortNumber;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
+import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
+import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
+import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
+import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ActorTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ActorTest.class);
+
+ private static final String TOPOLOGY_NETCONF = "TopologyNetconf";
+
+ @Mock
+ private EntityOwnershipService entityOwnershipService;
+
+ @Mock
+ private DataBroker dataBroker;
+
+ private static final BindingNormalizedNodeCodecRegistry CODEC_REGISTRY;
+
+ static {
+ final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
+ moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
+ final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
+ Preconditions.checkState(schemaContextOptional.isPresent());
+ final SchemaContext topologySchemaCtx = schemaContextOptional.get();
+
+ final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
+ CODEC_REGISTRY = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
+ CODEC_REGISTRY.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
+ }
+
+ private static final String PATH_MASTER = "akka.tcp://NetconfNode@127.0.0.1:2552/user/TopologyNetconf";
+ private static final String PATH_SLAVE1 = "akka.tcp://NetconfNode@127.0.0.1:2553/user/TopologyNetconf";
+ private static final String PATH_SLAVE2 = "akka.tcp://NetconfNode@127.0.0.1:2554/user/TopologyNetconf";
+
+ private static final List<String> PATHS_MASTER = Lists.newArrayList(PATH_SLAVE1, PATH_SLAVE2);
+ private static final List<String> PATHS_SLAVE1 = Lists.newArrayList(PATH_MASTER, PATH_SLAVE2);
+ private static final List<String> PATHS_SLAVE2 = Lists.newArrayList(PATH_MASTER, PATH_SLAVE1);
+
+ private static final ActorSystem ACTOR_SYSTEM = ActorSystem.create("NetconfNode", ConfigFactory.load("netconf-node1"));
+ private static final ActorSystem ACTOR_SYSTEM_SLAVE1 = ActorSystem.create("NetconfNode", ConfigFactory.load("netconf-node2"));
+ private static final ActorSystem ACTOR_SYSTEM_SLAVE2 = ActorSystem.create("NetconfNode", ConfigFactory.load("netconf-node3"));
+
+ private static final ExecutorService callbackExecutor = Executors.newFixedThreadPool(8);
+
+ private TopologyManager master = null;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ when(dataBroker.registerDataChangeListener(
+ any(LogicalDatastoreType.class),
+ any(InstanceIdentifier.class),
+ any(DataChangeListener.class),
+ any(DataChangeScope.class))).thenReturn(null);
+ }
+
+ private void setMaster(final TopologyManager manager) {
+
+ }
+
+ @Test
+ public void testRealActors() throws Exception {
+
+ EntityOwnershipService topoOwnership = new TestingEntityOwnershipService();
+ // load from config
+ final TopologyManager master = createManagerWithOwnership(ACTOR_SYSTEM, TOPOLOGY_NETCONF, true, createRealTopoTestingNodeCallbackFactory(), new TopologyRoleChangeStrategy(dataBroker, topoOwnership, TOPOLOGY_NETCONF, "topology-manager"));
+ Thread.sleep(1000);
+ final TopologyManager slave1 = createManagerWithOwnership(ACTOR_SYSTEM_SLAVE1, TOPOLOGY_NETCONF, false, createRealTopoTestingNodeCallbackFactory(), new TopologyRoleChangeStrategy(dataBroker, topoOwnership, TOPOLOGY_NETCONF, "topology-manager"));
+ final TopologyManager slave2 = createManagerWithOwnership(ACTOR_SYSTEM_SLAVE2, TOPOLOGY_NETCONF, false, createRealTopoTestingNodeCallbackFactory(), new TopologyRoleChangeStrategy(dataBroker, topoOwnership, TOPOLOGY_NETCONF, "topology-manager"));
+
+ await().atMost(30L, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return master.hasAllPeersUp();
+ }
+ });
+
+ final List<ListenableFuture<Node>> futures = new ArrayList<>();
+ for (int i = 0; i <= 1; i++) {
+ final String nodeid = "testing-node" + i;
+ final Node testingNode = new NodeBuilder()
+ .setNodeId(new NodeId(nodeid))
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+ .setPort(new PortNumber(10000 + i))
+ .build())
+ .build();
+ final ListenableFuture<Node> nodeListenableFuture = master.onNodeCreated(new NodeId(nodeid), testingNode);
+ futures.add(nodeListenableFuture);
+ Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
+ @Override
+ public void onSuccess(Node result) {
+ LOG.warn("Node {} created succesfully on all nodes", result.getNodeId().getValue());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.warn("Node creation failed. ", t);
+ }
+ });
+ }
+
+ for (int i = 0; i <= 1; i++) {
+ final String nodeid = "testing-node" + i;
+ final Node testingNode = new NodeBuilder()
+ .setNodeId(new NodeId(nodeid))
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+ .setPort(new PortNumber(10000 + i))
+ .build())
+ .build();
+ final ListenableFuture<Node> nodeListenableFuture = master.onNodeUpdated(new NodeId(nodeid), testingNode);
+ futures.add(nodeListenableFuture);
+ Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
+ @Override
+ public void onSuccess(Node result) {
+ LOG.warn("Node {} updated succesfully on all nodes", result.getNodeId().getValue());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.warn("Node update failed. ", t);
+ }
+ });
+ }
+
+
+ final List<ListenableFuture<Void>> deleteFutures = new ArrayList<>();
+ for (int i = 0; i <= 1; i++) {
+ final String nodeid = "testing-node" + i;
+ final ListenableFuture<Void> nodeListenableFuture = master.onNodeDeleted(new NodeId(nodeid));
+ deleteFutures.add(nodeListenableFuture);
+ Futures.addCallback(nodeListenableFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ LOG.warn("Node {} succesfully deleted on all nodes", nodeid);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.warn("Node delete failed. ", t);
+ }
+ });
+
+ }
+ LOG.warn("All tasks submitted");
+ Futures.allAsList(futures).get();
+ Futures.allAsList(deleteFutures).get();
+
+ TypedActor.get(ACTOR_SYSTEM).stop(master);
+ TypedActor.get(ACTOR_SYSTEM_SLAVE1).stop(slave1);
+ TypedActor.get(ACTOR_SYSTEM_SLAVE2).stop(slave2);
+
+ }
+
+ // TODO seems like stopping actors is not enough to create an actor with same name, split this into multiple classes?
+ @Ignore
+ @Test
+ public void testWithDummyOwnershipService() throws Exception {
+
+ final TestingEntityOwnershipService ownershipService = new TestingEntityOwnershipService();
+ // load from config
+ final TopologyManager master = createNoopRoleChangeNode(ACTOR_SYSTEM, TOPOLOGY_NETCONF, true, createRealTopoCallbackFactory(ownershipService));
+ final TopologyManager slave1 = createNoopRoleChangeNode(ACTOR_SYSTEM_SLAVE1, TOPOLOGY_NETCONF, false, createRealTopoCallbackFactory(ownershipService));
+ final TopologyManager slave2 = createNoopRoleChangeNode(ACTOR_SYSTEM_SLAVE2, TOPOLOGY_NETCONF, false, createRealTopoCallbackFactory(ownershipService));
+
+ await().atMost(10L, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return master.hasAllPeersUp();
+ }
+ });
+
+ final List<ListenableFuture<Node>> futures = new ArrayList<>();
+ for (int i = 0; i <= 0; i++) {
+ final String nodeid = "testing-node" + i;
+ final Node testingNode = new NodeBuilder()
+ .setNodeId(new NodeId(nodeid))
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+ .setPort(new PortNumber(10000 + i))
+ .build())
+ .build();
+ final ListenableFuture<Node> nodeListenableFuture = master.onNodeCreated(new NodeId(nodeid), testingNode);
+ futures.add(nodeListenableFuture);
+ Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
+ @Override
+ public void onSuccess(Node result) {
+ LOG.warn("Node {} created succesfully on all nodes", result.getNodeId().getValue());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.warn("Node creation failed. ", t);
+ }
+ });
+ }
+
+ Futures.allAsList(futures).get();
+ ownershipService.distributeOwnership();
+
+ Thread.sleep(30000);
+ TypedActor.get(ACTOR_SYSTEM).stop(master);
+ TypedActor.get(ACTOR_SYSTEM_SLAVE1).stop(slave1);
+ TypedActor.get(ACTOR_SYSTEM_SLAVE2).stop(slave2);
+ }
+
+ private TopologyManager createNoopRoleChangeNode(final ActorSystem actorSystem, final String topologyId, final boolean isMaster,
+ final TopologyManagerCallbackFactory topologyManagerCallbackFactory) {
+
+ final TypedActorExtension typedActorExtension = TypedActor.get(actorSystem);
+ return typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
+ @Override
+ public BaseTopologyManager create() throws Exception {
+ return new BaseTopologyManager(actorSystem,
+ CODEC_REGISTRY,
+ dataBroker,
+ topologyId,
+ topologyManagerCallbackFactory,
+ new TestingSuccesfulStateAggregator(),
+ new LoggingSalNodeWriter(),
+ new NoopRoleChangeStrategy(),
+ isMaster);
+ }
+ }), TOPOLOGY_NETCONF);
+ }
+
+ private TopologyManager createManagerWithOwnership(final ActorSystem actorSystem, final String topologyId, final boolean isMaster,
+ final TopologyManagerCallbackFactory topologyManagerCallbackFactory, final RoleChangeStrategy roleChangeStrategy) {
+ final TypedActorExtension typedActorExtension = TypedActor.get(actorSystem);
+ return typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
+ @Override
+ public BaseTopologyManager create() throws Exception {
+ return new BaseTopologyManager(actorSystem,
+ CODEC_REGISTRY,
+ dataBroker,
+ topologyId,
+ topologyManagerCallbackFactory,
+ new NetconfNodeOperationalDataAggregator(),
+ new LoggingSalNodeWriter(),
+ roleChangeStrategy,
+ isMaster);
+ }
+ }), TOPOLOGY_NETCONF);
+ }
+
+ private TopologyManagerCallbackFactory createRealTopoTestingNodeCallbackFactory() {
+ final NodeManagerCallbackFactory nodeManagerCallbackFactory = new NodeManagerCallbackFactory() {
+ @Override
+ public NodeManagerCallback create(String nodeId, String topologyId, ActorSystem actorSystem) {
+ return new LoggingNodeManagerCallback();
+ }
+ };
+
+ return new TopologyManagerCallbackFactory() {
+ @Override
+ public TopologyManagerCallback create(ActorSystem actorSystem, String topologyId) {
+ return new ExampleTopologyManagerCallback(actorSystem, dataBroker, topologyId, nodeManagerCallbackFactory, new LoggingSalNodeWriter());
+ }
+ };
+ }
+
+ private TopologyManagerCallbackFactory createRealTopoCallbackFactory(final EntityOwnershipService entityOwnershipService) {
+ final NodeManagerCallbackFactory nodeManagerCallbackFactory = new NodeManagerCallbackFactory() {
+ @Override
+ public NodeManagerCallback create(String nodeId, String topologyId, ActorSystem actorSystem) {
+ return new ExampleNodeManagerCallback();
+ }
+ };
+
+ return new TopologyManagerCallbackFactory() {
+ @Override
+ public TopologyManagerCallback create(ActorSystem actorSystem, String topologyId) {
+ return new ExampleTopologyManagerCallback(actorSystem, dataBroker, topologyId, nodeManagerCallbackFactory);
+ }
+ };
+ }
+
+ private TopologyManagerCallbackFactory createTestingTopoCallbackFactory() {
+ return new TopologyManagerCallbackFactory() {
+ @Override
+ public TopologyManagerCallback create(ActorSystem actorSystem, String topologyId) {
+ return new TestingTopologyManagerCallback();
+ }
+ };
+ }
+
+ public static class LoggingNodeManagerCallback implements NodeManagerCallback {
+
+ @Nonnull
+ @Override
+ public Node getInitialState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
+ final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
+ return new NodeBuilder()
+ .setNodeId(nodeId)
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setHost(netconfNode.getHost())
+ .setPort(netconfNode.getPort())
+ .setConnectionStatus(ConnectionStatus.Connecting)
+ .setClusteredConnectionStatus(
+ new ClusteredConnectionStatusBuilder()
+ .setNodeStatus(
+ Lists.newArrayList(
+ new NodeStatusBuilder()
+ .setNode("testing-node")
+ .setStatus(Status.Unavailable)
+ .build()))
+ .build())
+ .build())
+ .build();
+ }
+
+ @Nonnull
+ @Override
+ public Node getFailedState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
+ final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
+ return new NodeBuilder()
+ .setNodeId(nodeId)
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setHost(netconfNode.getHost())
+ .setPort(netconfNode.getPort())
+ .setConnectionStatus(ConnectionStatus.UnableToConnect)
+ .setClusteredConnectionStatus(
+ new ClusteredConnectionStatusBuilder()
+ .setNodeStatus(
+ Collections.singletonList(
+ new NodeStatusBuilder()
+ .setNode("testing-node")
+ .setStatus(Status.Failed)
+ .build()))
+ .build())
+ .build())
+ .build();
+ }
+
+ @Nonnull
+ @Override
+ public ListenableFuture<Node> onNodeCreated(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
+ LOG.debug("Creating node {} with config {}", nodeId, configNode);
+ final NetconfNode augmentation = configNode.getAugmentation(NetconfNode.class);
+ return Futures.immediateFuture(new NodeBuilder()
+ .setNodeId(nodeId)
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setConnectionStatus(ConnectionStatus.Connected)
+ .setHost(augmentation.getHost())
+ .setPort(augmentation.getPort())
+ .setClusteredConnectionStatus(
+ new ClusteredConnectionStatusBuilder()
+ .setNodeStatus(
+ Collections.singletonList(
+ new NodeStatusBuilder()
+ .setNode("testing-node")
+ .setStatus(Status.Connected)
+ .build()))
+ .build())
+ .build())
+ .build());
+ }
+
+ @Nonnull
+ @Override
+ public ListenableFuture<Node> onNodeUpdated(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
+ LOG.debug("Updating node {} with config {}", nodeId, configNode);
+ final NetconfNode augmentation = configNode.getAugmentation(NetconfNode.class);
+ return Futures.immediateFuture(new NodeBuilder()
+ .setNodeId(nodeId)
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setConnectionStatus(ConnectionStatus.Connected)
+ .setHost(augmentation.getHost())
+ .setPort(augmentation.getPort())
+ .setClusteredConnectionStatus(
+ new ClusteredConnectionStatusBuilder()
+ .setNodeStatus(
+ Collections.singletonList(
+ new NodeStatusBuilder()
+ .setNode("testing-node")
+ .setStatus(Status.Connected)
+ .build()))
+ .build())
+ .build())
+ .build());
+ }
+
+ @Nonnull
+ @Override
+ public ListenableFuture<Void> onNodeDeleted(@Nonnull NodeId nodeId) {
+ LOG.debug("Deleting node {}", nodeId);
+ return Futures.immediateFuture(null);
+ }
+
+ @Nonnull
+ @Override
+ public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+
+ }
+
+ @Override
+ public void onReceive(Object o, ActorRef actorRef) {
+
+ }
+ }
+
+ public static class TestingTopologyManagerCallback implements TopologyManagerCallback {
+
+ public TestingTopologyManagerCallback() {
+
+ }
+
+ @Override
+ public ListenableFuture<Node> onNodeCreated(NodeId nodeId, Node node) {
+ LOG.warn("Actor system that called this: {}", TypedActor.context().system().settings().toString());
+ return Futures.immediateFuture(new NodeBuilder()
+ .setNodeId(nodeId)
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setConnectionStatus(ConnectionStatus.Connected)
+ .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+ .setPort(new PortNumber(2555))
+ .build())
+ .build());
+ }
+
+ @Override
+ public ListenableFuture<Node> onNodeUpdated(NodeId nodeId, Node node) {
+ LOG.warn("Actor system that called this: {}", TypedActor.context().system().settings().toString());
+ LOG.debug("Update called on node {}, with config {}", nodeId.getValue(), node);
+ return Futures.immediateFuture(new NodeBuilder()
+ .setNodeId(nodeId)
+ .addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder()
+ .setConnectionStatus(ConnectionStatus.Connected)
+ .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+ .setPort(new PortNumber(65535))
+ .build())
+ .build());
+ }
+
+ @Override
+ public ListenableFuture<Void> onNodeDeleted(NodeId nodeId) {
+ LOG.debug("Delete called on node {}", nodeId.getValue());
+ return Futures.immediateFuture(null);
+ }
+
+ @Nonnull
+ @Override
+ public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+
+ }
+
+ @Override
+ public void onReceive(Object o, ActorRef actorRef) {
+
+ }
+ }
+
+ public class TestingSuccesfulStateAggregator implements StateAggregator {
+
+ @Override
+ public ListenableFuture<Node> combineCreateAttempts(List<ListenableFuture<Node>> stateFutures) {
+ final SettableFuture<Node> future = SettableFuture.create();
+ final ListenableFuture<List<Node>> allAsList = Futures.allAsList(stateFutures);
+ Futures.addCallback(allAsList, new FutureCallback<List<Node>>() {
+ @Override
+ public void onSuccess(List<Node> result) {
+ for (int i = 0; i < result.size() - 1; i++) {
+ if (!result.get(i).equals(result.get(i + 1))) {
+ LOG.warn("Node 1 {}: {}", result.get(i).getClass(), result.get(i));
+ LOG.warn("Node 2 {}: {}", result.get(i + 1).getClass(), result.get(i + 1));
+ future.setException(new IllegalStateException("Create futures have different result"));
+ LOG.warn("Future1 : {} Future2 : {}", result.get(i), result.get(i+1));
+ }
+ }
+ future.set(result.get(0));
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("One of the combined create attempts failed {}", t);
+ future.setException(t);
+ }
+ }, TypedActor.context().dispatcher());
+
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<Node> combineUpdateAttempts(List<ListenableFuture<Node>> stateFutures) {
+ final SettableFuture<Node> future = SettableFuture.create();
+ final ListenableFuture<List<Node>> allAsList = Futures.allAsList(stateFutures);
+ Futures.addCallback(allAsList, new FutureCallback<List<Node>>() {
+ @Override
+ public void onSuccess(List<Node> result) {
+ for (int i = 0; i < result.size() - 1; i++) {
+ if (!result.get(i).equals(result.get(i + 1))) {
+ future.setException(new IllegalStateException("Update futures have different result"));
+ }
+ }
+ future.set(result.get(0));
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("One of the combined update attempts failed {}", t);
+ future.setException(t);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<Void> combineDeleteAttempts(List<ListenableFuture<Void>> stateFutures) {
+ final SettableFuture<Void> future = SettableFuture.create();
+ final ListenableFuture<List<Void>> allAsList = Futures.allAsList(stateFutures);
+ Futures.addCallback(allAsList, new FutureCallback<List<Void>>() {
+ @Override
+ public void onSuccess(List<Void> result) {
+ future.set(null);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("One of the combined delete attempts failed {}", t);
+ future.setException(t);
+ }
+ });
+ return future;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology;
+
+import com.google.common.base.Optional;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestingEntityOwnershipService implements EntityOwnershipService{
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestingEntityOwnershipService.class);
+
+ private final List<EntityOwnershipListener> listeners = new ArrayList<>();
+ private final ExecutorService executorService = Executors.newFixedThreadPool(1);
+
+ private Entity entity;
+ private boolean masterSet = false;
+
+ @Override
+ public EntityOwnershipCandidateRegistration registerCandidate(final Entity entity) throws CandidateAlreadyRegisteredException {
+ LOG.warn("Registering Candidate");
+ this.entity = entity;
+ return new EntityOwnershipCandidateRegistration() {
+ @Override
+ public void close() {
+ LOG.debug("Closing candidate registration");
+ }
+
+ @Override
+ public Entity getInstance() {
+ return entity;
+ }
+ };
+ }
+
+ @Override
+ public EntityOwnershipListenerRegistration registerListener(final String entityType, final EntityOwnershipListener listener) {
+ listeners.add(listener);
+ if (listeners.size() == 3) {
+ distributeOwnership();
+ }
+ return new EntityOwnershipListenerRegistration() {
+ @Nonnull
+ @Override
+ public String getEntityType() {
+ return entityType;
+ }
+
+ @Override
+ public void close() {
+ listeners.remove(listener);
+ }
+
+ @Override
+ public EntityOwnershipListener getInstance() {
+ return listener;
+ }
+ };
+ }
+
+ @Override
+ public Optional<EntityOwnershipState> getOwnershipState(final Entity forEntity) {
+ return null;
+ }
+
+ @Override
+ public boolean isCandidateRegistered(@Nonnull Entity entity) {
+ return true;
+ }
+
+ public void distributeOwnership() {
+ LOG.debug("Distributing ownership");
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ masterSet = false;
+ LOG.debug("Distributing ownership for {} listeners", listeners.size());
+ for (final EntityOwnershipListener listener : listeners) {
+ if (!masterSet) {
+ listener.ownershipChanged(new EntityOwnershipChange(entity, false, true, true));
+ masterSet = true;
+ } else {
+ listener.ownershipChanged(new EntityOwnershipChange(entity, false, false, true));
+ }
+ }
+
+ }
+ });
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestingTopologyDispatcher implements NetconfTopology{
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestingTopologyDispatcher.class);
+
+ private final String topologyId;
+
+ private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ private final Set<NodeId> connected = new HashSet<>();
+ private final Map<NodeId, RemoteDeviceHandler<NetconfSessionPreferences>> listeners = new HashMap<>();
+
+
+ public TestingTopologyDispatcher(final String topologyId) {
+
+ this.topologyId = topologyId;
+ }
+
+ @Override
+ public String getTopologyId() {
+ return topologyId;
+ }
+
+ @Override
+ public DataBroker getDataBroker() {
+ return null;
+ }
+
+ // log the current connection attempt and return a successful future asynchronously
+ @Override
+ public ListenableFuture<NetconfDeviceCapabilities> connectNode(final NodeId nodeId, final Node configNode) {
+ final NetconfNode augmentation = configNode.getAugmentation(NetconfNode.class);
+ LOG.debug("Connecting node {}, with config: {} ", nodeId.getValue(),
+ augmentation.getHost().getIpAddress().toString() + ":" + augmentation.getPort());
+ connected.add(nodeId);
+ final SettableFuture<NetconfDeviceCapabilities> future = SettableFuture.create();
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(4000);
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ future.set(new NetconfDeviceCapabilities());
+ }
+ });
+ } catch (InterruptedException e) {
+ LOG.error("Cannot sleep thread", e);
+ }
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<Void> disconnectNode(final NodeId nodeId) {
+ Preconditions.checkState(connected.contains(nodeId), "Node is not connected yet");
+ LOG.debug("Disconnecting node {}", nodeId.getValue());
+ final SettableFuture<Void> future = SettableFuture.create();
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(4000);
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ connected.remove(nodeId);
+ future.set(null);
+ }
+ });
+ } catch (InterruptedException e) {
+ LOG.error("Cannot sleep thread", e);
+ }
+
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void registerMountPoint(NodeId nodeId) {
+ LOG.debug("Registering mount point for node {}", nodeId.getValue());
+
+ }
+
+ @Override
+ public void unregisterMountPoint(NodeId nodeId) {
+ LOG.debug("Unregistering mount point for node {}", nodeId.getValue());
+ }
+
+ @Override
+ public ConnectionStatusListenerRegistration registerConnectionStatusListener(final NodeId node, final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
+ Preconditions.checkState(connected.contains(node), "Node is not connected yet");
+
+ LOG.debug("Registering a connection status listener for node {}", node.getValue());
+ listeners.put(node, listener);
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(10000);
+
+ boolean up = false;
+ for (int i = 0; i < 20; i++) {
+ if (up) {
+ LOG.debug("Device has connected {}", node.getValue());
+ listener.onDeviceConnected(null, null, null);
+ up = false;
+ } else {
+ LOG.debug("Device has diconnected {}", node.getValue());
+ listener.onDeviceDisconnected();
+ up = true;
+ }
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ LOG.error("Cannot sleep thread", e);
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+ });
+
+ return null;
+ }
+}
--- /dev/null
+include "test.conf"
+
+akka {
+ # LISTEN on tcp port 2552
+ remote.netty.tcp.port = 2552
+
+ cluster {
+ seed-nodes = [
+ "akka.tcp://NetconfNode@127.0.0.1:2553",
+ "akka.tcp://NetconfNode@127.0.0.1:2554"]
+
+ auto-down-unreachable-after = 10s
+ }
+}
--- /dev/null
+include "test.conf"
+
+akka {
+ # LISTEN on tcp port 2553
+ remote.netty.tcp.port = 2553
+
+ cluster {
+ seed-nodes = [
+ "akka.tcp://NetconfNode@127.0.0.1:2552",
+ "akka.tcp://NetconfNode@127.0.0.1:2554"]
+
+ auto-down-unreachable-after = 10s
+ }
+}
--- /dev/null
+include "test.conf"
+
+akka {
+ # LISTEN on tcp port 2554
+ remote.netty.tcp.port = 2554
+
+ cluster {
+ seed-nodes = [
+ "akka.tcp://NetconfNode@127.0.0.1:2552",
+ "akka.tcp://NetconfNode@127.0.0.1:2553"]
+
+ auto-down-unreachable-after = 10s
+ }
+}
--- /dev/null
+akka {
+
+ version = "2.3.14"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ }
+
+ serialization-bindings {
+ "[B" = bytes
+ "java.io.Serializable" = java
+ }
+ }
+
+ remote {
+ enabled-transports = ["akka.remote.netty.tcp"]
+ netty.tcp {
+ hostname = "127.0.0.1"
+ }
+ }
+
+}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class NetconfDeviceSalProvider implements AutoCloseable, Provider, BindingAwareProvider {
+public class NetconfDeviceSalProvider implements AutoCloseable, Provider, BindingAwareProvider {
private static final Logger logger = LoggerFactory.getLogger(NetconfDeviceSalProvider.class);
private ObjectRegistration<DOMMountPoint> topologyRegistration;
- MountInstance(final DOMMountPointService mountService, final RemoteDeviceId id) {
+ public MountInstance(final DOMMountPointService mountService, final RemoteDeviceId id) {
this.mountService = Preconditions.checkNotNull(mountService);
this.id = Preconditions.checkNotNull(id);
}
}
leaf status {
type enumeration {
- enum successful;
+ enum connected;
+ enum unavailable;
enum failed;
}
}
<restconf.version>1.3.0-SNAPSHOT</restconf.version>
<protocol-framework.version>0.7.0-SNAPSHOT</protocol-framework.version>
<sshd-core.version>0.14.0</sshd-core.version>
- <scala.major.version>2.11</scala.major.version>
- <scala.minor.version>5</scala.minor.version>
+ <scala.major.version>2.10</scala.major.version>
+ <scala.minor.version>4</scala.minor.version>
<surefire.version>2.15</surefire.version>
<typesafe.config.version>1.2.1</typesafe.config.version>
<yangtools.version>0.8.0-SNAPSHOT</yangtools.version>