abbbb68265e3209915f32f17e589ff76c4dc2a91
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED;
12 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID;
13 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE;
14 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH;
15 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY;
16 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING;
17 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME;
18 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME;
19 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME;
20 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure;
21 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
22 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
23
24 import java.io.InputStream;
25 import java.net.InetSocketAddress;
26 import java.net.URI;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.ExecutorService;
35
36 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
37 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
38 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
39 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
40 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
41 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
42 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
43 import org.opendaylight.controller.sal.core.api.Provider;
44 import org.opendaylight.controller.sal.core.api.RpcImplementation;
45 import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
46 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
47 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
48 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
49 import org.opendaylight.protocol.framework.ReconnectStrategy;
50 import org.opendaylight.yangtools.concepts.Registration;
51 import org.opendaylight.yangtools.yang.common.QName;
52 import org.opendaylight.yangtools.yang.common.RpcResult;
53 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
54 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.Node;
56 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
57 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
58 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
59 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
60 import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
61 import org.opendaylight.yangtools.yang.model.api.Module;
62 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
63 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
64 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
65 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
66 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
67 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71 import com.google.common.base.Function;
72 import com.google.common.base.Optional;
73 import com.google.common.base.Predicate;
74 import com.google.common.collect.FluentIterable;
75 import com.google.common.collect.Iterables;
76 import com.google.common.util.concurrent.ListenableFuture;
77 import io.netty.util.concurrent.EventExecutor;
78
79 public class NetconfDevice implements Provider, //
80         DataReader<InstanceIdentifier, CompositeNode>, //
81         DataCommitHandler<InstanceIdentifier, CompositeNode>, //
82         RpcImplementation, //
83         AutoCloseable {
84
85     InetSocketAddress socketAddress;
86
87     MountProvisionInstance mountInstance;
88
89     EventExecutor eventExecutor;
90
91     ExecutorService processingExecutor;
92
93     InstanceIdentifier path;
94
95     ReconnectStrategy reconnectStrategy;
96
97     AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
98
99     private NetconfDeviceSchemaContextProvider deviceContextProvider;
100
101     protected Logger logger;
102
103     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
104     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
105     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
106     List<RpcRegistration> rpcReg;
107
108     String name;
109
110     MountProvisionService mountService;
111
112     NetconfClientDispatcher dispatcher;
113
114     static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
115
116     SchemaSourceProvider<InputStream> remoteSourceProvider;
117
118     DataBrokerService dataBroker;
119
120     NetconfDeviceListener listener;
121
122     public NetconfDevice(String name) {
123         this.name = name;
124         this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
125         this.path = InstanceIdentifier.builder(INVENTORY_PATH)
126                 .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
127     }
128
129     public void start() {
130         checkState(dispatcher != null, "Dispatcher must be set.");
131         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
132         checkState(eventExecutor != null, "Event executor must be set.");
133
134         listener = new NetconfDeviceListener(this);
135
136         logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
137
138         dispatcher.createClient(socketAddress, listener, reconnectStrategy);
139     }
140
141     Optional<SchemaContext> getSchemaContext() {
142         if (deviceContextProvider == null) {
143             return Optional.absent();
144         }
145         return deviceContextProvider.currentContext;
146     }
147
148     void bringDown() {
149         if (rpcReg != null) {
150             for (RpcRegistration reg : rpcReg) {
151                 reg.close();
152             }
153             rpcReg = null;
154         }
155         closeGracefully(confReaderReg);
156         confReaderReg = null;
157         closeGracefully(operReaderReg);
158         operReaderReg = null;
159         closeGracefully(commitHandlerReg);
160         commitHandlerReg = null;
161
162         updateDeviceState(false, Collections.<QName> emptySet());
163     }
164
165     private void closeGracefully(final AutoCloseable resource) {
166         if (resource != null) {
167             try {
168                 resource.close();
169             } catch (Exception e) {
170                 logger.warn("Ignoring exception while closing {}", resource, e);
171             }
172         }
173     }
174
175     void bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
176         remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
177         deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
178         deviceContextProvider.createContextFromCapabilities(capabilities);
179         if (mountInstance != null && getSchemaContext().isPresent()) {
180             mountInstance.setSchemaContext(getSchemaContext().get());
181         }
182
183         updateDeviceState(true, capabilities);
184
185         if (mountInstance != null) {
186             confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
187             operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
188             commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
189
190             List<RpcRegistration> rpcs = new ArrayList<>();
191             // TODO same condition twice
192             if (mountInstance != null && getSchemaContext().isPresent()) {
193                 for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
194                     rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), this));
195                 }
196             }
197             rpcReg = rpcs;
198         }
199     }
200
201     private void updateDeviceState(boolean up, Set<QName> capabilities) {
202         DataModificationTransaction transaction = dataBroker.beginTransaction();
203
204         CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
205         it.setQName(INVENTORY_NODE);
206         it.addLeaf(INVENTORY_ID, name);
207         it.addLeaf(INVENTORY_CONNECTED, up);
208
209         logger.debug("Client capabilities {}", capabilities);
210         for (QName capability : capabilities) {
211             it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability);
212         }
213
214         logger.debug("Update device state transaction " + transaction.getIdentifier()
215                 + " putting operational data started.");
216         transaction.removeOperationalData(path);
217         transaction.putOperationalData(path, it.toInstance());
218         logger.debug("Update device state transaction " + transaction.getIdentifier()
219                 + " putting operational data ended.");
220
221         // FIXME: this has to be asynchronous
222         RpcResult<TransactionStatus> transactionStatus = null;
223         try {
224             transactionStatus = transaction.commit().get();
225         } catch (InterruptedException e) {
226             throw new RuntimeException("Interrupted while waiting for response", e);
227         } catch (ExecutionException e) {
228             throw new RuntimeException("Read configuration data " + path + " failed", e);
229         }
230         // TODO better ex handling
231
232         if (transactionStatus.isSuccessful()) {
233             logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
234         } else {
235             logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
236             logger.debug("Update device state transaction status " + transaction.getStatus());
237         }
238     }
239
240     @Override
241     public CompositeNode readConfigurationData(InstanceIdentifier path) {
242         RpcResult<CompositeNode> result = null;
243         try {
244             result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
245                     wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
246         } catch (InterruptedException e) {
247             throw new RuntimeException("Interrupted while waiting for response", e);
248         } catch (ExecutionException e) {
249             throw new RuntimeException("Read configuration data " + path + " failed", e);
250         }
251
252         CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
253         return data == null ? null : (CompositeNode) findNode(data, path);
254     }
255
256     @Override
257     public CompositeNode readOperationalData(InstanceIdentifier path) {
258         RpcResult<CompositeNode> result = null;
259         try {
260             result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
261         } catch (InterruptedException e) {
262             throw new RuntimeException("Interrupted while waiting for response", e);
263         } catch (ExecutionException e) {
264             throw new RuntimeException("Read configuration data " + path + " failed", e);
265         }
266
267         CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
268         return (CompositeNode) findNode(data, path);
269     }
270
271     @Override
272     public Set<QName> getSupportedRpcs() {
273         return Collections.emptySet();
274     }
275
276     @Override
277     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
278         return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()));
279     }
280
281     @Override
282     public Collection<ProviderFunctionality> getProviderFunctionality() {
283         return Collections.emptySet();
284     }
285
286     @Override
287     public void onSessionInitiated(ProviderSession session) {
288         dataBroker = session.getService(DataBrokerService.class);
289
290         DataModificationTransaction transaction = dataBroker.beginTransaction();
291         if (operationalNodeNotExisting(transaction)) {
292             transaction.putOperationalData(path, getNodeWithId());
293         }
294         if (configurationNodeNotExisting(transaction)) {
295             transaction.putConfigurationData(path, getNodeWithId());
296         }
297
298         try {
299             transaction.commit().get();
300         } catch (InterruptedException e) {
301             throw new RuntimeException("Interrupted while waiting for response", e);
302         } catch (ExecutionException e) {
303             throw new RuntimeException("Read configuration data " + path + " failed", e);
304         }
305
306         mountService = session.getService(MountProvisionService.class);
307         if (mountService != null) {
308             mountInstance = mountService.createOrGetMountPoint(path);
309         }
310     }
311
312     CompositeNode getNodeWithId() {
313         SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
314         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
315     }
316
317     boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
318         return null == transaction.readConfigurationData(path);
319     }
320
321     boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
322         return null == transaction.readOperationalData(path);
323     }
324
325     static Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
326
327         Node<?> current = node;
328         for (InstanceIdentifier.PathArgument arg : identifier.getPath()) {
329             if (current instanceof SimpleNode<?>) {
330                 return null;
331             } else if (current instanceof CompositeNode) {
332                 CompositeNode currentComposite = (CompositeNode) current;
333
334                 current = currentComposite.getFirstCompositeByName(arg.getNodeType());
335                 if (current == null) {
336                     current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
337                 }
338                 if (current == null) {
339                     current = currentComposite.getFirstSimpleByName(arg.getNodeType());
340                 }
341                 if (current == null) {
342                     current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
343                 }
344                 if (current == null) {
345                     return null;
346                 }
347             }
348         }
349         return current;
350     }
351
352     @Override
353     public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
354             DataModification<InstanceIdentifier, CompositeNode> modification) {
355         NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
356                 modification, true);
357         try {
358             twoPhaseCommit.prepare();
359         } catch (InterruptedException e) {
360             throw new RuntimeException("Interrupted while waiting for response", e);
361         } catch (ExecutionException e) {
362             throw new RuntimeException("Read configuration data " + path + " failed", e);
363         }
364          return twoPhaseCommit;
365     }
366
367     Set<QName> getCapabilities(Collection<String> capabilities) {
368         return FluentIterable.from(capabilities).filter(new Predicate<String>() {
369             @Override
370             public boolean apply(final String capability) {
371                 return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
372             }
373         }).transform(new Function<String, QName>() {
374             @Override
375             public QName apply(final String capability) {
376                 String[] parts = capability.split("\\?");
377                 String namespace = parts[0];
378                 FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
379
380                 String revision = getStringAndTransform(queryParams, "revision=", "revision=");
381
382                 String moduleName = getStringAndTransform(queryParams, "module=", "module=");
383
384                 if (revision == null) {
385                     logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
386                     revision = getStringAndTransform(queryParams, "amp;revision==", "revision=");
387
388                     if (revision != null) {
389                         logger.warn("Netconf device returned revision incorectly escaped for {}", capability);
390                     }
391                 }
392                 if (revision == null) {
393                     return QName.create(URI.create(namespace), null, moduleName);
394                 }
395                 return QName.create(namespace, revision, moduleName);
396             }
397
398             private String getStringAndTransform(final Iterable<String> queryParams, final String match,
399                     final String substringToRemove) {
400                 Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
401                     @Override
402                     public boolean apply(final String input) {
403                         return input.startsWith(match);
404                     }
405                 });
406
407                 return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
408             }
409
410         }).toSet();
411     }
412
413     @Override
414     public void close() {
415         bringDown();
416     }
417
418     public String getName() {
419         return name;
420     }
421
422     public InetSocketAddress getSocketAddress() {
423         return socketAddress;
424     }
425
426     public MountProvisionInstance getMountInstance() {
427         return mountInstance;
428     }
429
430     public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) {
431         this.reconnectStrategy = reconnectStrategy;
432     }
433
434     public void setProcessingExecutor(final ExecutorService processingExecutor) {
435         this.processingExecutor = processingExecutor;
436     }
437
438     public void setSocketAddress(final InetSocketAddress socketAddress) {
439         this.socketAddress = socketAddress;
440     }
441
442     public void setEventExecutor(final EventExecutor eventExecutor) {
443         this.eventExecutor = eventExecutor;
444     }
445
446     public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider) {
447         this.schemaSourceProvider = schemaSourceProvider;
448     }
449
450     public void setDispatcher(final NetconfClientDispatcher dispatcher) {
451         this.dispatcher = dispatcher;
452     }
453 }
454
455 class NetconfDeviceSchemaContextProvider {
456
457     NetconfDevice device;
458
459     SchemaSourceProvider<InputStream> sourceProvider;
460
461     Optional<SchemaContext> currentContext;
462
463     NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
464         this.device = device;
465         this.sourceProvider = sourceProvider;
466         this.currentContext = Optional.absent();
467     }
468
469     void createContextFromCapabilities(Iterable<QName> capabilities) {
470         YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
471         if (!sourceContext.getMissingSources().isEmpty()) {
472             device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources());
473         }
474         device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources());
475         List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
476         if (!sourceContext.getValidSources().isEmpty()) {
477             SchemaContext schemaContext = tryToCreateContext(modelsToParse);
478             currentContext = Optional.fromNullable(schemaContext);
479         } else {
480             currentContext = Optional.absent();
481         }
482         if (currentContext.isPresent()) {
483             device.logger.debug("Schema context successfully created.");
484         }
485     }
486
487     SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
488         YangParserImpl parser = new YangParserImpl();
489         try {
490
491             Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
492             return parser.resolveSchemaContext(models);
493         } catch (Exception e) {
494             device.logger.debug("Error occured during parsing YANG schemas", e);
495             return null;
496         }
497     }
498 }