88ed13ca6de1dc3689db3afc70621d6cf3da84cd
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / AbstractNetconfTopology.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import io.netty.util.concurrent.EventExecutor;
16 import java.io.File;
17 import java.math.BigDecimal;
18 import java.net.InetSocketAddress;
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.Set;
24 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
25 import org.opendaylight.controller.config.threadpool.ThreadPool;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
28 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
29 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
31 import org.opendaylight.controller.sal.core.api.Broker;
32 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
33 import org.opendaylight.controller.sal.core.api.Provider;
34 import org.opendaylight.netconf.client.NetconfClientDispatcher;
35 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
36 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
37 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
38 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
39 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
40 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
41 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
42 import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
43 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
44 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
45 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
46 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
47 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
48 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
49 import org.opendaylight.protocol.framework.ReconnectStrategy;
50 import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
51 import org.opendaylight.protocol.framework.TimedReconnectStrategy;
52 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
53 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
56 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
63 import org.opendaylight.yangtools.yang.binding.Identifier;
64 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
65 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
66 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
67 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
68 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
69 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
70 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
71 import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
72 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
73 import org.opendaylight.yangtools.yang.parser.util.TextToASTTransformer;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 public abstract class AbstractNetconfTopology implements NetconfTopology, BindingAwareProvider, Provider{
78
79     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfTopology.class);
80
81     private static final long DEFAULT_REQUEST_TIMEOUT_MILIS = 60000L;
82     private static final int DEFAULT_KEEPALIVE_DELAY = 0;
83     private static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
84     private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
85     private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
86     private static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5);
87
88     private static FilesystemSchemaSourceCache<YangTextSchemaSource> CACHE = null;
89     //keep track of already initialized repositories to avoid adding redundant listeners
90     private static final Set<SchemaRepository> INITIALIZED_SCHEMA_REPOSITORIES = new HashSet<>();
91
92     protected final String topologyId;
93     private final NetconfClientDispatcher clientDispatcher;
94     protected final BindingAwareBroker bindingAwareBroker;
95     private final Broker domBroker;
96     private final EventExecutor eventExecutor;
97     private final ScheduledThreadPool keepaliveExecutor;
98     private final ThreadPool processingExecutor;
99     private final SharedSchemaRepository sharedSchemaRepository;
100
101     private SchemaSourceRegistry schemaRegistry = null;
102     private SchemaContextFactory schemaContextFactory = null;
103
104     protected DOMMountPointService mountPointService = null;
105     protected DataBroker dataBroker = null;
106     protected final HashMap<NodeId, NetconfConnectorDTO> activeConnectors = new HashMap<>();
107
108     protected AbstractNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
109                                       final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
110                                       final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
111                                       final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider) {
112         this.topologyId = topologyId;
113         this.clientDispatcher = clientDispatcher;
114         this.bindingAwareBroker = bindingAwareBroker;
115         this.domBroker = domBroker;
116         this.eventExecutor = eventExecutor;
117         this.keepaliveExecutor = keepaliveExecutor;
118         this.processingExecutor = processingExecutor;
119         this.sharedSchemaRepository = schemaRepositoryProvider.getSharedSchemaRepository();
120
121         initFilesystemSchemaSourceCache(sharedSchemaRepository);
122     }
123
124     protected void registerToSal(BindingAwareProvider baProvider, Provider provider) {
125         domBroker.registerProvider(provider);
126         bindingAwareBroker.registerProvider(baProvider);
127     }
128
129     private void initFilesystemSchemaSourceCache(SharedSchemaRepository repository) {
130         LOG.warn("Schema repository used: {}", repository.getIdentifier());
131         if (CACHE == null) {
132             CACHE = new FilesystemSchemaSourceCache<>(repository, YangTextSchemaSource.class, new File("cache/schema"));
133         }
134         if (!INITIALIZED_SCHEMA_REPOSITORIES.contains(repository)) {
135             repository.registerSchemaSourceListener(CACHE);
136             repository.registerSchemaSourceListener(TextToASTTransformer.create(repository, repository));
137             INITIALIZED_SCHEMA_REPOSITORIES.add(repository);
138         }
139         setSchemaRegistry(repository);
140         setSchemaContextFactory(repository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT));
141     }
142
143     public void setSchemaRegistry(final SchemaSourceRegistry schemaRegistry) {
144         this.schemaRegistry = schemaRegistry;
145     }
146
147     public void setSchemaContextFactory(final SchemaContextFactory schemaContextFactory) {
148         this.schemaContextFactory = schemaContextFactory;
149     }
150
151     @Override
152     public abstract void onSessionInitiated(ProviderContext session);
153
154     @Override
155     public String getTopologyId() {
156         return topologyId;
157     }
158
159     @Override
160     public DataBroker getDataBroker() {
161         return dataBroker;
162     }
163
164     @Override
165     public ListenableFuture<NetconfDeviceCapabilities> connectNode(NodeId nodeId, Node configNode) {
166         LOG.info("Connecting RemoteDevice{{}} , with config {}", nodeId, configNode);
167         return setupConnection(nodeId, configNode);
168     }
169
170     @Override
171     public ListenableFuture<Void> disconnectNode(NodeId nodeId) {
172         LOG.debug("Disconnecting RemoteDevice{{}}", nodeId.getValue());
173         if (!activeConnectors.containsKey(nodeId)) {
174             return Futures.immediateFailedFuture(new IllegalStateException("Unable to disconnect device that is not connected"));
175         }
176
177         // retrieve connection, and disconnect it
178         final NetconfConnectorDTO connectorDTO = activeConnectors.remove(nodeId);
179         connectorDTO.getCommunicator().close();
180         connectorDTO.getFacade().close();
181         return Futures.immediateFuture(null);
182     }
183
184     private ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
185                                                                         final Node configNode) {
186         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
187
188         Preconditions.checkNotNull(netconfNode.getHost());
189         Preconditions.checkNotNull(netconfNode.getPort());
190         Preconditions.checkNotNull(netconfNode.isTcpOnly());
191
192         final NetconfConnectorDTO deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode);
193         final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
194         final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(deviceCommunicator, netconfNode);
195         final ListenableFuture<NetconfDeviceCapabilities> future = deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig);
196         activeConnectors.put(nodeId, deviceCommunicatorDTO);
197
198         Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
199             @Override
200             public void onSuccess(NetconfDeviceCapabilities result) {
201                 LOG.debug("Connector for : " + nodeId.getValue() + " started succesfully");
202             }
203
204             @Override
205             public void onFailure(Throwable t) {
206                 LOG.error("Connector for : " + nodeId.getValue() + " failed");
207                 // remove this node from active connectors?
208             }
209         });
210
211         return future;
212     }
213
214     private NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
215                                                          final NetconfNode node) {
216         //setup default values since default value is not supported yet in mdsal
217         // TODO remove this when mdsal starts supporting default values
218         final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
219         final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
220         final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
221
222         IpAddress ipAddress = node.getHost().getIpAddress();
223         InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
224                 ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
225                 node.getPort().getValue());
226         RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
227
228         RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
229                 createSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
230
231         if (keepaliveDelay > 0) {
232             LOG.warn("Adding keepalive facade, for device {}", nodeId);
233             salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
234         }
235
236         NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
237                 new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
238
239         NetconfDevice device = new NetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
240                 processingExecutor.getExecutor(), reconnectOnChangedSchema);
241
242         return new NetconfConnectorDTO(new NetconfDeviceCommunicator(remoteDeviceId, device), salFacade);
243     }
244
245     public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDeviceCommunicator listener, NetconfNode node) {
246
247         //setup default values since default value is not supported yet in mdsal
248         // TODO remove this when mdsal starts supporting default values
249         final long clientConnectionTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
250         final long maxConnectionAttempts = node.getMaxConnectionAttempts() == null ? DEFAULT_MAX_CONNECTION_ATTEMPTS : node.getMaxConnectionAttempts();
251         final int betweenAttemptsTimeoutMillis = node.getBetweenAttemptsTimeoutMillis() == null ? DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS : node.getBetweenAttemptsTimeoutMillis();
252         final BigDecimal sleepFactor = node.getSleepFactor() == null ? DEFAULT_SLEEP_FACTOR : node.getSleepFactor();
253
254         final InetSocketAddress socketAddress = getSocketAddress(node.getHost(), node.getPort().getValue());
255
256         final ReconnectStrategyFactory sf = new TimedReconnectStrategyFactory(eventExecutor,
257                 maxConnectionAttempts, betweenAttemptsTimeoutMillis, sleepFactor);
258         final ReconnectStrategy strategy = sf.createReconnectStrategy();
259
260         final AuthenticationHandler authHandler;
261         final Credentials credentials = node.getCredentials();
262         if (credentials instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) {
263             authHandler = new LoginPassword(
264                     ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getUsername(),
265                     ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getPassword());
266         } else {
267             throw new IllegalStateException("Only login/password authentification is supported");
268         }
269
270         return NetconfReconnectingClientConfigurationBuilder.create()
271                 .withAddress(socketAddress)
272                 .withConnectionTimeoutMillis(clientConnectionTimeoutMillis)
273                 .withReconnectStrategy(strategy)
274                 .withAuthHandler(authHandler)
275                 .withProtocol(node.isTcpOnly() ?
276                         NetconfClientConfiguration.NetconfClientProtocol.TCP :
277                         NetconfClientConfiguration.NetconfClientProtocol.SSH)
278                 .withConnectStrategyFactory(sf)
279                 .withSessionListener(listener)
280                 .build();
281     }
282
283     protected abstract RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis);
284
285     @Override
286     public abstract ConnectionStatusListenerRegistration registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener);
287
288     @Override
289     public void onSessionInitiated(ProviderSession session) {
290         mountPointService = session.getService(DOMMountPointService.class);
291     }
292
293     @Override
294     public Collection<ProviderFunctionality> getProviderFunctionality() {
295         return Collections.emptySet();
296     }
297
298     //TODO this needs to be an util method, since netconf clustering uses this aswell
299     /**
300      * Determines the Netconf Node Node ID, given the node's instance
301      * identifier.
302      *
303      * @param pathArgument Node's path arument
304      * @return     NodeId for the node
305      */
306     protected NodeId getNodeId(final PathArgument pathArgument) {
307         if (pathArgument instanceof InstanceIdentifier.IdentifiableItem<?, ?>) {
308
309             final Identifier key = ((InstanceIdentifier.IdentifiableItem) pathArgument).getKey();
310             if(key instanceof NodeKey) {
311                 return ((NodeKey) key).getNodeId();
312             }
313         }
314         throw new IllegalStateException("Unable to create NodeId from: " + pathArgument);
315     }
316
317     protected static InstanceIdentifier<Topology> createTopologyId(final String topologyId) {
318         final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
319         return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
320     }
321
322     private InetSocketAddress getSocketAddress(final Host host, int port) {
323         if(host.getDomainName() != null) {
324             return new InetSocketAddress(host.getDomainName().getValue(), port);
325         } else {
326             final IpAddress ipAddress = host.getIpAddress();
327             final String ip = ipAddress.getIpv4Address() != null ? ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue();
328             return new InetSocketAddress(ip, port);
329         }
330     }
331
332     private static final class TimedReconnectStrategyFactory implements ReconnectStrategyFactory {
333         private final Long connectionAttempts;
334         private final EventExecutor executor;
335         private final double sleepFactor;
336         private final int minSleep;
337
338         TimedReconnectStrategyFactory(final EventExecutor executor, final Long maxConnectionAttempts, final int minSleep, final BigDecimal sleepFactor) {
339             if (maxConnectionAttempts != null && maxConnectionAttempts > 0) {
340                 connectionAttempts = maxConnectionAttempts;
341             } else {
342                 connectionAttempts = null;
343             }
344
345             this.sleepFactor = sleepFactor.doubleValue();
346             this.executor = executor;
347             this.minSleep = minSleep;
348         }
349
350         @Override
351         public ReconnectStrategy createReconnectStrategy() {
352             final Long maxSleep = null;
353             final Long deadline = null;
354
355             return new TimedReconnectStrategy(executor, minSleep,
356                     minSleep, sleepFactor, maxSleep, connectionAttempts, deadline);
357         }
358     }
359
360     protected static final class NetconfConnectorDTO {
361
362         private final NetconfDeviceCommunicator communicator;
363         private final RemoteDeviceHandler<NetconfSessionPreferences> facade;
364
365         private NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
366             this.communicator = communicator;
367             this.facade = facade;
368         }
369
370         public NetconfDeviceCommunicator getCommunicator() {
371             return communicator;
372         }
373
374         public RemoteDeviceHandler<NetconfSessionPreferences> getFacade() {
375             return facade;
376         }
377     }
378
379 }