2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import io.netty.util.concurrent.EventExecutor;
17 import java.math.BigDecimal;
18 import java.net.InetSocketAddress;
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.HashSet;
24 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
25 import org.opendaylight.controller.config.threadpool.ThreadPool;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
28 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
29 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
31 import org.opendaylight.controller.sal.core.api.Broker;
32 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
33 import org.opendaylight.controller.sal.core.api.Provider;
34 import org.opendaylight.netconf.client.NetconfClientDispatcher;
35 import org.opendaylight.netconf.client.NetconfClientSessionListener;
36 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
37 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
38 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
39 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
40 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
41 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
42 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
43 import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
44 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
45 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
46 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
47 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
48 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
49 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
50 import org.opendaylight.protocol.framework.ReconnectStrategy;
51 import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
52 import org.opendaylight.protocol.framework.TimedReconnectStrategy;
53 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
54 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
64 import org.opendaylight.yangtools.yang.binding.Identifier;
65 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
66 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
67 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
68 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
69 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
70 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
71 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
72 import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
73 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
74 import org.opendaylight.yangtools.yang.parser.util.TextToASTTransformer;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
78 public abstract class AbstractNetconfTopology implements NetconfTopology, BindingAwareProvider, Provider{
80 private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfTopology.class);
82 protected static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000L;
83 protected static final int DEFAULT_KEEPALIVE_DELAY = 0;
84 protected static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
85 private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
86 private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
87 private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 20000L;
88 private static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5);
90 private static FilesystemSchemaSourceCache<YangTextSchemaSource> CACHE = null;
91 //keep track of already initialized repositories to avoid adding redundant listeners
92 private static final Set<SchemaRepository> INITIALIZED_SCHEMA_REPOSITORIES = new HashSet<>();
94 protected final String topologyId;
95 private final NetconfClientDispatcher clientDispatcher;
96 protected final BindingAwareBroker bindingAwareBroker;
97 protected final Broker domBroker;
98 private final EventExecutor eventExecutor;
99 protected final ScheduledThreadPool keepaliveExecutor;
100 protected final ThreadPool processingExecutor;
101 protected final SharedSchemaRepository sharedSchemaRepository;
103 protected SchemaSourceRegistry schemaRegistry = null;
104 protected SchemaContextFactory schemaContextFactory = null;
106 protected DOMMountPointService mountPointService = null;
107 protected DataBroker dataBroker = null;
108 protected final HashMap<NodeId, NetconfConnectorDTO> activeConnectors = new HashMap<>();
110 protected AbstractNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
111 final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
112 final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
113 final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider) {
114 this.topologyId = topologyId;
115 this.clientDispatcher = clientDispatcher;
116 this.bindingAwareBroker = bindingAwareBroker;
117 this.domBroker = domBroker;
118 this.eventExecutor = eventExecutor;
119 this.keepaliveExecutor = keepaliveExecutor;
120 this.processingExecutor = processingExecutor;
121 this.sharedSchemaRepository = schemaRepositoryProvider.getSharedSchemaRepository();
123 initFilesystemSchemaSourceCache(sharedSchemaRepository);
126 protected void registerToSal(BindingAwareProvider baProvider, Provider provider) {
127 domBroker.registerProvider(provider);
128 bindingAwareBroker.registerProvider(baProvider);
131 private void initFilesystemSchemaSourceCache(SharedSchemaRepository repository) {
132 LOG.warn("Schema repository used: {}", repository.getIdentifier());
134 CACHE = new FilesystemSchemaSourceCache<>(repository, YangTextSchemaSource.class, new File("cache/schema"));
136 if (!INITIALIZED_SCHEMA_REPOSITORIES.contains(repository)) {
137 repository.registerSchemaSourceListener(CACHE);
138 repository.registerSchemaSourceListener(TextToASTTransformer.create(repository, repository));
139 INITIALIZED_SCHEMA_REPOSITORIES.add(repository);
141 setSchemaRegistry(repository);
142 setSchemaContextFactory(repository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT));
145 public void setSchemaRegistry(final SchemaSourceRegistry schemaRegistry) {
146 this.schemaRegistry = schemaRegistry;
149 public void setSchemaContextFactory(final SchemaContextFactory schemaContextFactory) {
150 this.schemaContextFactory = schemaContextFactory;
154 public abstract void onSessionInitiated(ProviderContext session);
157 public String getTopologyId() {
162 public DataBroker getDataBroker() {
167 public ListenableFuture<NetconfDeviceCapabilities> connectNode(NodeId nodeId, Node configNode) {
168 LOG.info("Connecting RemoteDevice{{}} , with config {}", nodeId, configNode);
169 return setupConnection(nodeId, configNode);
173 public ListenableFuture<Void> disconnectNode(NodeId nodeId) {
174 LOG.debug("Disconnecting RemoteDevice{{}}", nodeId.getValue());
175 if (!activeConnectors.containsKey(nodeId)) {
176 return Futures.immediateFailedFuture(new IllegalStateException("Unable to disconnect device that is not connected"));
179 // retrieve connection, and disconnect it
180 final NetconfConnectorDTO connectorDTO = activeConnectors.remove(nodeId);
181 connectorDTO.getCommunicator().close();
182 connectorDTO.getFacade().close();
183 return Futures.immediateFuture(null);
186 protected ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
187 final Node configNode) {
188 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
190 Preconditions.checkNotNull(netconfNode.getHost());
191 Preconditions.checkNotNull(netconfNode.getPort());
192 Preconditions.checkNotNull(netconfNode.isTcpOnly());
194 final NetconfConnectorDTO deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode);
195 final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
196 final NetconfClientSessionListener netconfClientSessionListener = deviceCommunicatorDTO.getSessionListener();
197 final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(netconfClientSessionListener, netconfNode);
198 final ListenableFuture<NetconfDeviceCapabilities> future = deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig);
200 activeConnectors.put(nodeId, deviceCommunicatorDTO);
202 Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
204 public void onSuccess(NetconfDeviceCapabilities result) {
205 LOG.debug("Connector for : " + nodeId.getValue() + " started succesfully");
209 public void onFailure(Throwable t) {
210 LOG.error("Connector for : " + nodeId.getValue() + " failed");
211 // remove this node from active connectors?
218 protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
219 final NetconfNode node) {
220 //setup default values since default value is not supported yet in mdsal
221 // TODO remove this when mdsal starts supporting default values
222 final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
223 final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
224 final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
226 IpAddress ipAddress = node.getHost().getIpAddress();
227 InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
228 ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
229 node.getPort().getValue());
230 RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
232 RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
233 createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
235 if (keepaliveDelay > 0) {
236 LOG.warn("Adding keepalive facade, for device {}", nodeId);
237 salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
240 NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
241 new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
243 NetconfDevice device = new NetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
244 processingExecutor.getExecutor(), reconnectOnChangedSchema);
246 return new NetconfConnectorDTO(new NetconfDeviceCommunicator(remoteDeviceId, device), salFacade);
249 public NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener, NetconfNode node) {
251 //setup default values since default value is not supported yet in mdsal
252 // TODO remove this when mdsal starts supporting default values
253 final long clientConnectionTimeoutMillis = node.getConnectionTimeoutMillis() == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : node.getConnectionTimeoutMillis();
254 final long maxConnectionAttempts = node.getMaxConnectionAttempts() == null ? DEFAULT_MAX_CONNECTION_ATTEMPTS : node.getMaxConnectionAttempts();
255 final int betweenAttemptsTimeoutMillis = node.getBetweenAttemptsTimeoutMillis() == null ? DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS : node.getBetweenAttemptsTimeoutMillis();
256 final BigDecimal sleepFactor = node.getSleepFactor() == null ? DEFAULT_SLEEP_FACTOR : node.getSleepFactor();
258 final InetSocketAddress socketAddress = getSocketAddress(node.getHost(), node.getPort().getValue());
260 final ReconnectStrategyFactory sf = new TimedReconnectStrategyFactory(eventExecutor,
261 maxConnectionAttempts, betweenAttemptsTimeoutMillis, sleepFactor);
262 final ReconnectStrategy strategy = sf.createReconnectStrategy();
264 final AuthenticationHandler authHandler;
265 final Credentials credentials = node.getCredentials();
266 if (credentials instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) {
267 authHandler = new LoginPassword(
268 ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getUsername(),
269 ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getPassword());
271 throw new IllegalStateException("Only login/password authentification is supported");
274 return NetconfReconnectingClientConfigurationBuilder.create()
275 .withAddress(socketAddress)
276 .withConnectionTimeoutMillis(clientConnectionTimeoutMillis)
277 .withReconnectStrategy(strategy)
278 .withAuthHandler(authHandler)
279 .withProtocol(node.isTcpOnly() ?
280 NetconfClientConfiguration.NetconfClientProtocol.TCP :
281 NetconfClientConfiguration.NetconfClientProtocol.SSH)
282 .withConnectStrategyFactory(sf)
283 .withSessionListener(listener)
287 protected abstract RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis);
290 public abstract ConnectionStatusListenerRegistration registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener);
293 public void onSessionInitiated(ProviderSession session) {
294 mountPointService = session.getService(DOMMountPointService.class);
298 public Collection<ProviderFunctionality> getProviderFunctionality() {
299 return Collections.emptySet();
302 //TODO this needs to be an util method, since netconf clustering uses this aswell
304 * Determines the Netconf Node Node ID, given the node's instance
307 * @param pathArgument Node's path arument
308 * @return NodeId for the node
310 protected NodeId getNodeId(final PathArgument pathArgument) {
311 if (pathArgument instanceof InstanceIdentifier.IdentifiableItem<?, ?>) {
313 final Identifier key = ((InstanceIdentifier.IdentifiableItem) pathArgument).getKey();
314 if(key instanceof NodeKey) {
315 return ((NodeKey) key).getNodeId();
318 throw new IllegalStateException("Unable to create NodeId from: " + pathArgument);
321 protected static InstanceIdentifier<Topology> createTopologyId(final String topologyId) {
322 final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
323 return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
326 private InetSocketAddress getSocketAddress(final Host host, int port) {
327 if(host.getDomainName() != null) {
328 return new InetSocketAddress(host.getDomainName().getValue(), port);
330 final IpAddress ipAddress = host.getIpAddress();
331 final String ip = ipAddress.getIpv4Address() != null ? ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue();
332 return new InetSocketAddress(ip, port);
336 private static final class TimedReconnectStrategyFactory implements ReconnectStrategyFactory {
337 private final Long connectionAttempts;
338 private final EventExecutor executor;
339 private final double sleepFactor;
340 private final int minSleep;
342 TimedReconnectStrategyFactory(final EventExecutor executor, final Long maxConnectionAttempts, final int minSleep, final BigDecimal sleepFactor) {
343 if (maxConnectionAttempts != null && maxConnectionAttempts > 0) {
344 connectionAttempts = maxConnectionAttempts;
346 connectionAttempts = null;
349 this.sleepFactor = sleepFactor.doubleValue();
350 this.executor = executor;
351 this.minSleep = minSleep;
355 public ReconnectStrategy createReconnectStrategy() {
356 final Long maxSleep = null;
357 final Long deadline = null;
359 return new TimedReconnectStrategy(executor, minSleep,
360 minSleep, sleepFactor, maxSleep, connectionAttempts, deadline);
364 protected static class NetconfConnectorDTO {
366 private final NetconfDeviceCommunicator communicator;
367 private final RemoteDeviceHandler<NetconfSessionPreferences> facade;
369 public NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
370 this.communicator = communicator;
371 this.facade = facade;
374 public NetconfDeviceCommunicator getCommunicator() {
378 public RemoteDeviceHandler<NetconfSessionPreferences> getFacade() {
382 public NetconfClientSessionListener getSessionListener() {