Added support to register of RPC Implementation.
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.xtend
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf
9
10 import com.google.common.base.Optional
11 import com.google.common.collect.FluentIterable
12 import io.netty.util.concurrent.EventExecutor
13 import java.io.InputStream
14 import java.net.InetSocketAddress
15 import java.net.URI
16 import java.util.Collections
17 import java.util.List
18 import java.util.Set
19 import java.util.concurrent.ExecutorService
20 import java.util.concurrent.Future
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
22 import org.opendaylight.controller.md.sal.common.api.data.DataModification
23 import org.opendaylight.controller.md.sal.common.api.data.DataReader
24 import org.opendaylight.controller.netconf.api.NetconfMessage
25 import org.opendaylight.controller.netconf.client.NetconfClient
26 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
27 import org.opendaylight.controller.netconf.util.xml.XmlUtil
28 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
29 import org.opendaylight.controller.sal.core.api.Provider
30 import org.opendaylight.controller.sal.core.api.RpcImplementation
31 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
32 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
33 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
34 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
35 import org.opendaylight.protocol.framework.ReconnectStrategy
36 import org.opendaylight.yangtools.concepts.Registration
37 import org.opendaylight.yangtools.yang.common.QName
38 import org.opendaylight.yangtools.yang.data.api.CompositeNode
39 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
40 import org.opendaylight.yangtools.yang.data.api.Node
41 import org.opendaylight.yangtools.yang.data.api.SimpleNode
42 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
43 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
44 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
45 import org.opendaylight.yangtools.yang.model.api.SchemaContext
46 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
47 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
48 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
49 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
50 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
51 import org.slf4j.Logger
52 import org.slf4j.LoggerFactory
53
54 import static com.google.common.base.Preconditions.*
55 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
56
57 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
58
59 class NetconfDevice implements Provider, // 
60 DataReader<InstanceIdentifier, CompositeNode>, //
61 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
62 RpcImplementation, //
63 AutoCloseable {
64
65     var NetconfClient client;
66
67     @Property
68     var InetSocketAddress socketAddress;
69
70     @Property
71     var MountProvisionInstance mountInstance;
72
73     @Property
74     var EventExecutor eventExecutor;
75
76     @Property
77     var ExecutorService processingExecutor;
78
79     @Property
80     var InstanceIdentifier path;
81
82     @Property
83     var ReconnectStrategy reconnectStrategy;
84
85     @Property
86     var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
87
88     @Property
89     private NetconfDeviceSchemaContextProvider deviceContextProvider
90
91     protected val Logger logger
92
93     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
94     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
95     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
96
97     val String name
98     MountProvisionService mountService
99
100     int messegeRetryCount = 5;
101
102     int messageTimeoutCount = 5 * 1000;
103
104     Set<QName> cachedCapabilities
105
106     @Property
107     var NetconfClientDispatcher dispatcher
108
109     static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
110
111     @Property
112     var SchemaSourceProvider<InputStream> remoteSourceProvider
113     
114     DataBrokerService dataBroker
115
116     public new(String name) {
117         this.name = name;
118         this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
119         this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
120             Collections.singletonMap(INVENTORY_ID, name)).toInstance;
121     }
122
123     def start() {
124         checkState(dispatcher != null, "Dispatcher must be set.");
125         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
126         checkState(eventExecutor != null, "Event executor must be set.");
127
128         val listener = new NetconfDeviceListener(this);
129         val task = startClientTask(dispatcher, listener)
130         return processingExecutor.submit(task) as Future<Void>;
131
132     }
133
134     def Optional<SchemaContext> getSchemaContext() {
135         if (deviceContextProvider == null) {
136             return Optional.absent();
137         }
138         return deviceContextProvider.currentContext;
139     }
140
141     private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
142         return [ |
143             try {
144                 logger.info("Starting Netconf Client on: {}", socketAddress);
145                 client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
146                 logger.debug("Initial capabilities {}", initialCapabilities);
147                 var SchemaSourceProvider<String> delegate;
148                 if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) {
149                     delegate = new NetconfRemoteSchemaSourceProvider(this);
150                 }  else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) {
151                     delegate = new NetconfRemoteSchemaSourceProvider(this);
152                 } else {
153                     logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress);
154                     delegate = SchemaSourceProviders.<String>noopProvider();
155                 }
156                 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
157                 deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
158                 deviceContextProvider.createContextFromCapabilities(initialCapabilities);
159                 if (mountInstance != null && schemaContext.isPresent) {
160                     mountInstance.schemaContext = schemaContext.get();
161                     val operations = schemaContext.get().operations;
162                     for (rpc : operations) {
163                         mountInstance.addRpcImplementation(rpc.QName, this);
164                     }
165                 }
166                 updateDeviceState()
167                 if (mountInstance != null && confReaderReg == null && operReaderReg == null) {
168                     confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
169                     operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
170                     commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
171                 }
172             } catch (Exception e) {
173                 logger.error("Netconf client NOT started. ", e)
174             }
175         ]
176     }
177
178     private def updateDeviceState() {
179         val transaction = dataBroker.beginTransaction
180
181         val it = ImmutableCompositeNode.builder
182         setQName(INVENTORY_NODE)
183         addLeaf(INVENTORY_ID, name)
184         addLeaf(INVENTORY_CONNECTED, client.clientSession.up)
185
186         logger.debug("Client capabilities {}", client.capabilities)
187         for (capability : client.capabilities) {
188             addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
189         }
190
191         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
192         transaction.putOperationalData(path, it.toInstance)
193         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
194         val transactionStatus = transaction.commit.get;
195
196         if (transactionStatus.successful) {
197             logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.")
198         } else {
199             logger.debug("Update device state transaction " + transaction.identifier + " FAILED!")
200             logger.debug("Update device state transaction status " + transaction.status)
201         }
202     }
203
204     override readConfigurationData(InstanceIdentifier path) {
205         val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
206             wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure()));
207         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
208         return data?.findNode(path) as CompositeNode;
209     }
210
211     override readOperationalData(InstanceIdentifier path) {
212         val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure()));
213         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
214         return data?.findNode(path) as CompositeNode;
215     }
216
217     override getSupportedRpcs() {
218         Collections.emptySet;
219     }
220
221     def createSubscription(String streamName) {
222         val it = ImmutableCompositeNode.builder()
223         QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
224         addLeaf("stream", streamName);
225         invokeRpc(QName, toInstance())
226     }
227
228     override invokeRpc(QName rpc, CompositeNode input) {
229         try {
230             val message = rpc.toRpcMessage(input,schemaContext);
231             val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
232             return result.toRpcResult(rpc, schemaContext);
233
234         } catch (Exception e) {
235             logger.error("Rpc was not processed correctly.", e)
236             throw e;
237         }
238     }
239
240     def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) {
241         logger.debug("Send message {}",XmlUtil.toString(message.document))
242         val result = client.sendMessage(message, retryCount, timeout);
243         NetconfMapping.checkValidReply(message, result)
244         return result;
245     }
246
247     override getProviderFunctionality() {
248         Collections.emptySet
249     }
250
251     override onSessionInitiated(ProviderSession session) {
252         dataBroker = session.getService(DataBrokerService);
253
254         val transaction = dataBroker.beginTransaction
255         if (transaction.operationalNodeNotExisting) {
256             transaction.putOperationalData(path, nodeWithId)
257         }
258         if (transaction.configurationNodeNotExisting) {
259             transaction.putConfigurationData(path, nodeWithId)
260         }
261         transaction.commit().get();
262         mountService = session.getService(MountProvisionService);
263         mountInstance = mountService?.createOrGetMountPoint(path);
264     }
265
266     def getNodeWithId() {
267         val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
268         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
269     }
270
271     def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
272         return null === transaction.readConfigurationData(path);
273     }
274
275     def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
276         return null === transaction.readOperationalData(path);
277     }
278
279     static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
280
281         var Node<?> current = node;
282         for (arg : identifier.path) {
283             if (current instanceof SimpleNode<?>) {
284                 return null;
285             } else if (current instanceof CompositeNode) {
286                 val currentComposite = (current as CompositeNode);
287                 
288                 current = currentComposite.getFirstCompositeByName(arg.nodeType);
289                 if(current == null) {
290                     current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
291                 }
292                 if(current == null) {
293                     current = currentComposite.getFirstSimpleByName(arg.nodeType);
294                 }
295                 if (current == null) {
296                     current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
297                 } if (current == null) {
298                     return null;
299                 }
300             }
301         }
302         return current;
303     }
304
305     override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
306         val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification);
307         twoPhaseCommit.prepare()
308         return twoPhaseCommit;
309     }
310
311     def getInitialCapabilities() {
312         val capabilities = client?.capabilities;
313         if (capabilities == null) {
314             return null;
315         }
316         if (cachedCapabilities == null) {
317             cachedCapabilities = FluentIterable.from(capabilities).filter[
318                 contains("?") && contains("module=") && contains("revision=")].transform [
319                 val parts = split("\\?");
320                 val namespace = parts.get(0);
321                 val queryParams = FluentIterable.from(parts.get(1).split("&"));
322                 var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
323                 val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
324                 if (revision === null) {
325                     logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
326                     revision = queryParams.findFirst[startsWith("&amp;revision=")]?.replaceAll("revision=", "");
327                     if (revision != null) {
328                         logger.warn("Netconf device returned revision incorectly escaped for {}", it)
329                     }
330                 }
331                 if (revision == null) {
332                     return QName.create(URI.create(namespace), null, moduleName);
333                 }
334                 return QName.create(namespace, revision, moduleName);
335             ].toSet();
336         }
337         return cachedCapabilities;
338     }
339
340     override close() {
341         confReaderReg?.close()
342         operReaderReg?.close()
343         client?.close()
344     }
345
346 }
347
348 package class NetconfDeviceSchemaContextProvider {
349
350     @Property
351     val NetconfDevice device;
352
353     @Property
354     val SchemaSourceProvider<InputStream> sourceProvider;
355
356     @Property
357     var Optional<SchemaContext> currentContext;
358
359     new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
360         _device = device
361         _sourceProvider = sourceProvider
362         _currentContext = Optional.absent();
363     }
364
365     def createContextFromCapabilities(Iterable<QName> capabilities) {
366         val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
367         if (!sourceContext.missingSources.empty) {
368             device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
369         }
370         device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
371         val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
372         if (!sourceContext.validSources.empty) {
373             val schemaContext = tryToCreateContext(modelsToParse);
374             currentContext = Optional.fromNullable(schemaContext);
375         } else {
376             currentContext = Optional.absent();
377         }
378         if (currentContext.present) {
379             device.logger.debug("Schema context successfully created.");
380         }
381
382     }
383
384     def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
385         val parser = new YangParserImpl();
386         try {
387
388             val models = parser.parseYangModelsFromStreams(modelsToParse);
389             val result = parser.resolveSchemaContext(models);
390             return result;
391         } catch (Exception e) {
392             device.logger.debug("Error occured during parsing YANG schemas", e);
393             return null;
394         }
395     }
396 }