2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.impl;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import io.netty.util.concurrent.EventExecutor;
17 import java.math.BigDecimal;
18 import java.net.InetSocketAddress;
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.HashSet;
24 import javax.annotation.Nonnull;
25 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
26 import org.opendaylight.controller.config.threadpool.ThreadPool;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
30 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
31 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
34 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
35 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
36 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
37 import org.opendaylight.controller.sal.core.api.Broker;
38 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
39 import org.opendaylight.controller.sal.core.api.Provider;
40 import org.opendaylight.netconf.client.NetconfClientDispatcher;
41 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
42 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
43 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
44 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
45 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
46 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
47 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
48 import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
49 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
50 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
51 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
52 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
53 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalFacade;
54 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
55 import org.opendaylight.netconf.topology.NetconfTopology;
56 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
57 import org.opendaylight.netconf.topology.TopologyMountPointFacade;
58 import org.opendaylight.protocol.framework.ReconnectStrategy;
59 import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
60 import org.opendaylight.protocol.framework.TimedReconnectStrategy;
61 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
62 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
65 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
66 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
67 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
68 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
69 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
70 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
71 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
72 import org.opendaylight.yangtools.concepts.ListenerRegistration;
73 import org.opendaylight.yangtools.yang.binding.Identifier;
74 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
75 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
76 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
77 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
78 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
79 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
80 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
81 import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
82 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
83 import org.opendaylight.yangtools.yang.parser.util.TextToASTTransformer;
84 import org.slf4j.Logger;
85 import org.slf4j.LoggerFactory;
87 public class NetconfTopologyImpl implements NetconfTopology, DataTreeChangeListener<Node>, BindingAwareProvider, Provider, AutoCloseable {
89 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyImpl.class);
91 private static final long DEFAULT_REQUEST_TIMEOUT_MILIS = 60000L;
92 private static final int DEFAULT_KEEPALIVE_DELAY = 0;
93 private static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
94 private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
95 private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
96 private static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5);
98 private static FilesystemSchemaSourceCache<YangTextSchemaSource> CACHE = null;
99 //keep track of already initialized repositories to avoid adding redundant listeners
100 private static final Set<SchemaRepository> INITIALIZED_SCHEMA_REPOSITORIES = new HashSet<>();
102 private final String topologyId;
103 private final boolean listenForConfigChanges;
104 private final NetconfClientDispatcher clientDispatcher;
105 private final BindingAwareBroker bindingAwareBroker;
106 private final Broker domBroker;
107 private final EventExecutor eventExecutor;
108 private final ScheduledThreadPool keepaliveExecutor;
109 private final ThreadPool processingExecutor;
110 private final SharedSchemaRepository sharedSchemaRepository;
112 private SchemaSourceRegistry schemaRegistry = null;
113 private SchemaContextFactory schemaContextFactory = null;
115 private DOMMountPointService mountPointService = null;
116 private DataBroker dataBroker = null;
117 private final HashMap<NodeId, NetconfConnectorDTO> activeConnectors = new HashMap<>();
119 private ListenerRegistration<NetconfTopologyImpl> listenerRegistration = null;
121 public NetconfTopologyImpl(final String topologyId, final boolean listenForConfigChanges, final NetconfClientDispatcher clientDispatcher,
122 final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
123 final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
124 final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider) {
125 this.topologyId = topologyId;
126 this.listenForConfigChanges = listenForConfigChanges;
127 this.clientDispatcher = clientDispatcher;
128 this.bindingAwareBroker = bindingAwareBroker;
129 this.domBroker = domBroker;
130 this.eventExecutor = eventExecutor;
131 this.keepaliveExecutor = keepaliveExecutor;
132 this.processingExecutor = processingExecutor;
133 this.sharedSchemaRepository = schemaRepositoryProvider.getSharedSchemaRepository();
135 initFilesystemSchemaSourceCache(sharedSchemaRepository);
137 registerToSal(this, this);
140 private void registerToSal(BindingAwareProvider baProvider, Provider provider) {
141 domBroker.registerProvider(provider);
142 bindingAwareBroker.registerProvider(baProvider);
146 public void close() throws Exception {
147 // close all existing connectors, delete whole topology in datastore?
148 for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
149 connectorDTO.getCommunicator().disconnect();
151 activeConnectors.clear();
153 if (listenerRegistration != null) {
154 listenerRegistration.close();
155 listenerRegistration = null;
160 public String getTopologyId() {
165 public DataBroker getDataBroker() {
166 return Preconditions.checkNotNull(dataBroker, "DataBroker not initialized yet");
170 public ListenableFuture<NetconfDeviceCapabilities> connectNode(NodeId nodeId, Node configNode) {
171 LOG.info("Connecting RemoteDevice{{}} , with config {}", nodeId, configNode);
172 return setupConnection(nodeId, configNode);
176 public ListenableFuture<Void> disconnectNode(NodeId nodeId) {
177 LOG.debug("Disconnecting RemoteDevice{{}}", nodeId.getValue());
178 if (!activeConnectors.containsKey(nodeId)) {
179 return Futures.immediateFailedFuture(new IllegalStateException("Unable to disconnect device that is not connected"));
182 // retrieve connection, and disconnect it
183 final NetconfConnectorDTO connectorDTO = activeConnectors.remove(nodeId);
184 connectorDTO.getCommunicator().close();
185 connectorDTO.getFacade().close();
186 return Futures.immediateFuture(null);
190 public void registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener) {
191 if (activeConnectors.get(node).getFacade() instanceof TopologyMountPointFacade) {
192 ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
194 LOG.warn("Unable to register a connection status listener on a regular salFacade, reconfigure for topology mountpoint facade");
198 private ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
199 final Node configNode) {
200 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
202 Preconditions.checkNotNull(netconfNode.getHost());
203 Preconditions.checkNotNull(netconfNode.getPort());
204 Preconditions.checkNotNull(netconfNode.isTcpOnly());
206 final NetconfConnectorDTO deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode);
207 final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
208 final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(deviceCommunicator, netconfNode);
209 final ListenableFuture<NetconfDeviceCapabilities> future = deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig);
211 Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
213 public void onSuccess(NetconfDeviceCapabilities result) {
214 LOG.debug("Connector for : " + nodeId.getValue() + " started succesfully");
215 activeConnectors.put(nodeId, deviceCommunicatorDTO);
219 public void onFailure(Throwable t) {
220 LOG.error("Connector for : " + nodeId.getValue() + " failed");
221 // remove this node from active connectors?
228 private NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
229 final NetconfNode node) {
230 //setup default values since default value is not supported yet in mdsal
231 // TODO remove this when mdsal starts supporting default values
232 final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
233 final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
234 final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
236 IpAddress ipAddress = node.getHost().getIpAddress();
237 InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
238 ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
239 node.getPort().getValue());
240 RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
242 // we might need to create a new SalFacade to maintain backwards compatibility with special case loopback connection
243 // TopologyMountPointFacade mountPointFacade =
244 // new TopologyMountPointFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
245 RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
246 new NetconfDeviceSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
247 // new TopologyMountPointFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
249 if (keepaliveDelay > 0) {
250 LOG.warn("Adding keepalive facade, for device {}", nodeId);
251 salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
254 NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
255 new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
257 NetconfDevice device = new NetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
258 processingExecutor.getExecutor(), reconnectOnChangedSchema);
260 return new NetconfConnectorDTO(new NetconfDeviceCommunicator(remoteDeviceId, device), salFacade);
263 public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDeviceCommunicator listener, NetconfNode node) {
265 //setup default values since default value is not supported yet in mdsal
266 // TODO remove this when mdsal starts supporting default values
267 final long clientConnectionTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
268 final long maxConnectionAttempts = node.getMaxConnectionAttempts() == null ? DEFAULT_MAX_CONNECTION_ATTEMPTS : node.getMaxConnectionAttempts();
269 final int betweenAttemptsTimeoutMillis = node.getBetweenAttemptsTimeoutMillis() == null ? DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS : node.getBetweenAttemptsTimeoutMillis();
270 final BigDecimal sleepFactor = node.getSleepFactor() == null ? DEFAULT_SLEEP_FACTOR : node.getSleepFactor();
272 final InetSocketAddress socketAddress = getSocketAddress(node.getHost(), node.getPort().getValue());
274 final ReconnectStrategyFactory sf = new TimedReconnectStrategyFactory(eventExecutor,
275 maxConnectionAttempts, betweenAttemptsTimeoutMillis, sleepFactor);
276 final ReconnectStrategy strategy = sf.createReconnectStrategy();
278 final AuthenticationHandler authHandler;
279 final Credentials credentials = node.getCredentials();
280 if (credentials instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) {
281 authHandler = new LoginPassword(
282 ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getUsername(),
283 ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getPassword());
285 throw new IllegalStateException("Only login/password authentification is supported");
288 return NetconfReconnectingClientConfigurationBuilder.create()
289 .withAddress(socketAddress)
290 .withConnectionTimeoutMillis(clientConnectionTimeoutMillis)
291 .withReconnectStrategy(strategy)
292 .withAuthHandler(authHandler)
293 .withProtocol(node.isTcpOnly() ?
294 NetconfClientConfiguration.NetconfClientProtocol.TCP :
295 NetconfClientConfiguration.NetconfClientProtocol.SSH)
296 .withConnectStrategyFactory(sf)
297 .withSessionListener(listener)
302 public void onSessionInitiated(ProviderSession session) {
303 mountPointService = session.getService(DOMMountPointService.class);
307 public Collection<ProviderFunctionality> getProviderFunctionality() {
308 return Collections.emptySet();
312 public void onSessionInitiated(ProviderContext session) {
313 dataBroker = session.getSALService(DataBroker.class);
315 if (listenForConfigChanges) {
316 LOG.warn("Registering datastore listener");
317 listenerRegistration =
318 dataBroker.registerDataTreeChangeListener(
319 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, createTopologyId(topologyId).child(Node.class)), this);
324 public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Node>> collection) {
325 for (DataTreeModification<Node> change : collection) {
326 final DataObjectModification<Node> rootNode = change.getRootNode();
327 switch (rootNode.getModificationType()) {
328 case SUBTREE_MODIFIED:
329 LOG.debug("Config for node {} updated", getNodeId(rootNode.getIdentifier()));
330 disconnectNode(getNodeId(rootNode.getIdentifier()));
331 connectNode(getNodeId(rootNode.getIdentifier()), rootNode.getDataAfter());
334 LOG.debug("Config for node {} created", getNodeId(rootNode.getIdentifier()));
335 if (activeConnectors.containsKey(getNodeId(rootNode.getIdentifier()))) {
336 LOG.warn("RemoteDevice{{}} was already configured, reconfiguring..");
337 disconnectNode(getNodeId(rootNode.getIdentifier()));
339 connectNode(getNodeId(rootNode.getIdentifier()), rootNode.getDataAfter());
342 LOG.debug("Config for node {} deleted", getNodeId(rootNode.getIdentifier()));
343 disconnectNode(getNodeId(rootNode.getIdentifier()));
349 private void initFilesystemSchemaSourceCache(SharedSchemaRepository repository) {
350 LOG.warn("Schema repository used: {}", repository.getIdentifier());
352 CACHE = new FilesystemSchemaSourceCache<>(repository, YangTextSchemaSource.class, new File("cache/schema"));
354 if (!INITIALIZED_SCHEMA_REPOSITORIES.contains(repository)) {
355 repository.registerSchemaSourceListener(CACHE);
356 repository.registerSchemaSourceListener(TextToASTTransformer.create(repository, repository));
357 INITIALIZED_SCHEMA_REPOSITORIES.add(repository);
359 setSchemaRegistry(repository);
360 setSchemaContextFactory(repository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT));
363 public void setSchemaRegistry(final SchemaSourceRegistry schemaRegistry) {
364 this.schemaRegistry = schemaRegistry;
367 public void setSchemaContextFactory(final SchemaContextFactory schemaContextFactory) {
368 this.schemaContextFactory = schemaContextFactory;
371 //TODO this needs to be an util method, since netconf clustering uses this aswell
373 * Determines the Netconf Node Node ID, given the node's instance
376 * @param pathArgument Node's path arument
377 * @return NodeId for the node
379 private NodeId getNodeId(final PathArgument pathArgument) {
380 if (pathArgument instanceof InstanceIdentifier.IdentifiableItem<?, ?>) {
382 final Identifier key = ((InstanceIdentifier.IdentifiableItem) pathArgument).getKey();
383 if(key instanceof NodeKey) {
384 return ((NodeKey) key).getNodeId();
387 throw new IllegalStateException("Unable to create NodeId from: " + pathArgument);
390 private static InstanceIdentifier<Topology> createTopologyId(final String topologyId) {
391 final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
392 return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
395 private static final class NetconfConnectorDTO {
397 private final NetconfDeviceCommunicator communicator;
398 private final RemoteDeviceHandler<NetconfSessionPreferences> facade;
400 private NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
401 this.communicator = communicator;
402 this.facade = facade;
405 public NetconfDeviceCommunicator getCommunicator() {
409 public RemoteDeviceHandler<NetconfSessionPreferences> getFacade() {
414 private static final class TimedReconnectStrategyFactory implements ReconnectStrategyFactory {
415 private final Long connectionAttempts;
416 private final EventExecutor executor;
417 private final double sleepFactor;
418 private final int minSleep;
420 TimedReconnectStrategyFactory(final EventExecutor executor, final Long maxConnectionAttempts, final int minSleep, final BigDecimal sleepFactor) {
421 if (maxConnectionAttempts != null && maxConnectionAttempts > 0) {
422 connectionAttempts = maxConnectionAttempts;
424 connectionAttempts = null;
427 this.sleepFactor = sleepFactor.doubleValue();
428 this.executor = executor;
429 this.minSleep = minSleep;
433 public ReconnectStrategy createReconnectStrategy() {
434 final Long maxSleep = null;
435 final Long deadline = null;
437 return new TimedReconnectStrategy(executor, minSleep,
438 minSleep, sleepFactor, maxSleep, connectionAttempts, deadline);
442 private InetSocketAddress getSocketAddress(final Host host, int port) {
443 if(host.getDomainName() != null) {
444 return new InetSocketAddress(host.getDomainName().getValue(), port);
446 final IpAddress ipAddress = host.getIpAddress();
447 final String ip = ipAddress.getIpv4Address() != null ? ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue();
448 return new InetSocketAddress(ip, port);