BUG-732 Fix netconf-connector unable to detect monitoring capability
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.xtend
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf
9
10 import com.google.common.base.Optional
11 import com.google.common.collect.FluentIterable
12 import io.netty.util.concurrent.EventExecutor
13 import java.io.InputStream
14 import java.net.InetSocketAddress
15 import java.net.URI
16 import java.util.ArrayList
17 import java.util.Collection
18 import java.util.Collections
19 import java.util.List
20 import java.util.Set
21 import java.util.concurrent.ExecutorService
22 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
23 import org.opendaylight.controller.md.sal.common.api.data.DataModification
24 import org.opendaylight.controller.md.sal.common.api.data.DataReader
25 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
26 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
27 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
28 import org.opendaylight.controller.sal.core.api.Provider
29 import org.opendaylight.controller.sal.core.api.RpcImplementation
30 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
31 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
32 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
33 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
34 import org.opendaylight.protocol.framework.ReconnectStrategy
35 import org.opendaylight.yangtools.concepts.Registration
36 import org.opendaylight.yangtools.yang.common.QName
37 import org.opendaylight.yangtools.yang.data.api.CompositeNode
38 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
39 import org.opendaylight.yangtools.yang.data.api.Node
40 import org.opendaylight.yangtools.yang.data.api.SimpleNode
41 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
42 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
43 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext
45 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
46 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
47 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
48 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
49 import org.slf4j.Logger
50 import org.slf4j.LoggerFactory
51
52 import static com.google.common.base.Preconditions.*
53 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
54
55 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
56
57 class NetconfDevice implements Provider, //
58 DataReader<InstanceIdentifier, CompositeNode>, //
59 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
60 RpcImplementation, //
61 AutoCloseable {
62
63     @Property
64     var InetSocketAddress socketAddress;
65
66     @Property
67     var MountProvisionInstance mountInstance;
68
69     @Property
70     var EventExecutor eventExecutor;
71
72     @Property
73     var ExecutorService processingExecutor;
74
75     @Property
76     var InstanceIdentifier path;
77
78     @Property
79     var ReconnectStrategy reconnectStrategy;
80
81     @Property
82     var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
83
84     @Property
85     private NetconfDeviceSchemaContextProvider deviceContextProvider
86
87     protected val Logger logger
88
89     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
90     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
91     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
92     List<RpcRegistration> rpcReg
93
94     @Property
95     val String name
96
97     MountProvisionService mountService
98
99     @Property
100     var NetconfClientDispatcher dispatcher
101
102     static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
103
104     @Property
105     var SchemaSourceProvider<InputStream> remoteSourceProvider
106
107     DataBrokerService dataBroker
108
109     var NetconfDeviceListener listener;
110
111     public new(String name) {
112         this._name = name;
113         this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
114         this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
115             Collections.singletonMap(INVENTORY_ID, name)).toInstance;
116     }
117
118     def start() {
119         checkState(dispatcher != null, "Dispatcher must be set.");
120         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
121         checkState(eventExecutor != null, "Event executor must be set.");
122
123         listener = new NetconfDeviceListener(this);
124
125         logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
126
127         dispatcher.createClient(socketAddress, listener, reconnectStrategy);
128     }
129
130     def Optional<SchemaContext> getSchemaContext() {
131         if (deviceContextProvider == null) {
132             return Optional.absent();
133         }
134         return deviceContextProvider.currentContext;
135     }
136
137     def bringDown() {
138         if (rpcReg != null) {
139             for (reg : rpcReg) {
140                 reg.close()
141             }
142             rpcReg = null
143         }
144         confReaderReg?.close()
145         confReaderReg = null
146         operReaderReg?.close()
147         operReaderReg = null
148         commitHandlerReg?.close()
149         commitHandlerReg = null
150
151         updateDeviceState(false, Collections.emptySet())
152     }
153
154     def bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
155         remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
156         deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
157         deviceContextProvider.createContextFromCapabilities(capabilities);
158         if (mountInstance != null && schemaContext.isPresent) {
159             mountInstance.schemaContext = schemaContext.get();
160         }
161
162         updateDeviceState(true, capabilities)
163
164         if (mountInstance != null) {
165             confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
166             operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
167             commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
168
169             val rpcs = new ArrayList<RpcRegistration>();
170             if (mountInstance != null && schemaContext.isPresent) {
171                 for (rpc : mountInstance.schemaContext.operations) {
172                     rpcs.add(mountInstance.addRpcImplementation(rpc.QName, this));
173                 }
174             }
175             rpcReg = rpcs
176         }
177     }
178
179     private def updateDeviceState(boolean up, Set<QName> capabilities) {
180         val transaction = dataBroker.beginTransaction
181
182         val it = ImmutableCompositeNode.builder
183         setQName(INVENTORY_NODE)
184         addLeaf(INVENTORY_ID, name)
185         addLeaf(INVENTORY_CONNECTED, up)
186
187         logger.debug("Client capabilities {}", capabilities)
188         for (capability : capabilities) {
189             addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
190         }
191
192         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
193         transaction.removeOperationalData(path)
194         transaction.putOperationalData(path, it.toInstance)
195         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
196
197         // FIXME: this has to be asynchronous
198         val transactionStatus = transaction.commit.get;
199
200         if (transactionStatus.successful) {
201             logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.")
202         } else {
203             logger.debug("Update device state transaction " + transaction.identifier + " FAILED!")
204             logger.debug("Update device state transaction status " + transaction.status)
205         }
206     }
207
208     override readConfigurationData(InstanceIdentifier path) {
209         val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
210             wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get();
211         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
212         return data?.findNode(path) as CompositeNode;
213     }
214
215     override readOperationalData(InstanceIdentifier path) {
216         val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get();
217         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
218         return data?.findNode(path) as CompositeNode;
219     }
220
221     override getSupportedRpcs() {
222         Collections.emptySet;
223     }
224
225     override invokeRpc(QName rpc, CompositeNode input) {
226         return listener.sendRequest(rpc.toRpcMessage(input,schemaContext));
227     }
228
229     override getProviderFunctionality() {
230         Collections.emptySet
231     }
232
233     override onSessionInitiated(ProviderSession session) {
234         dataBroker = session.getService(DataBrokerService);
235
236         val transaction = dataBroker.beginTransaction
237         if (transaction.operationalNodeNotExisting) {
238             transaction.putOperationalData(path, nodeWithId)
239         }
240         if (transaction.configurationNodeNotExisting) {
241             transaction.putConfigurationData(path, nodeWithId)
242         }
243         transaction.commit().get();
244         mountService = session.getService(MountProvisionService);
245         mountInstance = mountService?.createOrGetMountPoint(path);
246     }
247
248     def getNodeWithId() {
249         val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
250         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
251     }
252
253     def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
254         return null === transaction.readConfigurationData(path);
255     }
256
257     def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
258         return null === transaction.readOperationalData(path);
259     }
260
261     static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
262
263         var Node<?> current = node;
264         for (arg : identifier.path) {
265             if (current instanceof SimpleNode<?>) {
266                 return null;
267             } else if (current instanceof CompositeNode) {
268                 val currentComposite = (current as CompositeNode);
269
270                 current = currentComposite.getFirstCompositeByName(arg.nodeType);
271                 if(current == null) {
272                     current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
273                 }
274                 if(current == null) {
275                     current = currentComposite.getFirstSimpleByName(arg.nodeType);
276                 }
277                 if (current == null) {
278                     current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
279                 } if (current == null) {
280                     return null;
281                 }
282             }
283         }
284         return current;
285     }
286
287     override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
288         val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification, true);
289         twoPhaseCommit.prepare()
290         return twoPhaseCommit;
291     }
292
293     def getCapabilities(Collection<String> capabilities) {
294         return FluentIterable.from(capabilities).filter[
295                 contains("?") && contains("module=") && contains("revision=")].transform [
296                 val parts = split("\\?");
297                 val namespace = parts.get(0);
298                 val queryParams = FluentIterable.from(parts.get(1).split("&"));
299                 var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
300                 val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
301                 if (revision === null) {
302                     logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
303                     revision = queryParams.findFirst[startsWith("&amp;revision=")]?.replaceAll("revision=", "");
304                     if (revision != null) {
305                         logger.warn("Netconf device returned revision incorectly escaped for {}", it)
306                     }
307                 }
308                 if (revision == null) {
309                     return QName.create(URI.create(namespace), null, moduleName);
310                 }
311                 return QName.create(namespace, revision, moduleName);
312             ].toSet();
313     }
314
315     override close() {
316         bringDown()
317     }
318 }
319
320 package class NetconfDeviceSchemaContextProvider {
321
322     @Property
323     val NetconfDevice device;
324
325     @Property
326     val SchemaSourceProvider<InputStream> sourceProvider;
327
328     @Property
329     var Optional<SchemaContext> currentContext;
330
331     new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
332         _device = device
333         _sourceProvider = sourceProvider
334         _currentContext = Optional.absent();
335     }
336
337     def createContextFromCapabilities(Iterable<QName> capabilities) {
338         val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
339         if (!sourceContext.missingSources.empty) {
340             device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
341         }
342         device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
343         val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
344         if (!sourceContext.validSources.empty) {
345             val schemaContext = tryToCreateContext(modelsToParse);
346             currentContext = Optional.fromNullable(schemaContext);
347         } else {
348             currentContext = Optional.absent();
349         }
350         if (currentContext.present) {
351             device.logger.debug("Schema context successfully created.");
352         }
353
354     }
355
356     def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
357         val parser = new YangParserImpl();
358         try {
359
360             val models = parser.parseYangModelsFromStreams(modelsToParse);
361             val result = parser.resolveSchemaContext(models);
362             return result;
363         } catch (Exception e) {
364             device.logger.debug("Error occured during parsing YANG schemas", e);
365             return null;
366         }
367     }
368 }