3eb0472b5c5bf3a4b1079e731875529f072da7d0
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.xtend
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf
9
10 import com.google.common.base.Optional
11 import com.google.common.collect.FluentIterable
12 import io.netty.util.concurrent.EventExecutor
13 import java.io.InputStream
14 import java.net.InetSocketAddress
15 import java.net.URI
16 import java.util.Collections
17 import java.util.List
18 import java.util.Set
19 import java.util.concurrent.ExecutorService
20 import java.util.concurrent.Future
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
22 import org.opendaylight.controller.md.sal.common.api.data.DataModification
23 import org.opendaylight.controller.md.sal.common.api.data.DataReader
24 import org.opendaylight.controller.netconf.api.NetconfMessage
25 import org.opendaylight.controller.netconf.client.NetconfClient
26 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
27 import org.opendaylight.controller.netconf.util.xml.XmlUtil
28 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
29 import org.opendaylight.controller.sal.core.api.Provider
30 import org.opendaylight.controller.sal.core.api.RpcImplementation
31 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
32 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
33 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
34 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
35 import org.opendaylight.protocol.framework.ReconnectStrategy
36 import org.opendaylight.yangtools.concepts.Registration
37 import org.opendaylight.yangtools.yang.common.QName
38 import org.opendaylight.yangtools.yang.data.api.CompositeNode
39 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
40 import org.opendaylight.yangtools.yang.data.api.Node
41 import org.opendaylight.yangtools.yang.data.api.SimpleNode
42 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
43 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
44 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
45 import org.opendaylight.yangtools.yang.model.api.SchemaContext
46 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
47 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
48 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
49 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
50 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
51 import org.slf4j.Logger
52 import org.slf4j.LoggerFactory
53
54 import static com.google.common.base.Preconditions.*
55 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
56
57 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
58 import com.google.common.util.concurrent.Futures
59
60 class NetconfDevice implements Provider, // 
61 DataReader<InstanceIdentifier, CompositeNode>, //
62 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
63 RpcImplementation, //
64 AutoCloseable {
65
66     var NetconfClient client;
67
68     @Property
69     var InetSocketAddress socketAddress;
70
71     @Property
72     var MountProvisionInstance mountInstance;
73
74     @Property
75     var EventExecutor eventExecutor;
76
77     @Property
78     var ExecutorService processingExecutor;
79
80     @Property
81     var InstanceIdentifier path;
82
83     @Property
84     var ReconnectStrategy reconnectStrategy;
85
86     @Property
87     var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
88
89     @Property
90     private NetconfDeviceSchemaContextProvider deviceContextProvider
91
92     protected val Logger logger
93
94     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
95     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
96     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
97
98     val String name
99     MountProvisionService mountService
100
101     int messegeRetryCount = 5;
102
103     int messageTimeoutCount = 5 * 1000;
104
105     Set<QName> cachedCapabilities
106
107     @Property
108     var NetconfClientDispatcher dispatcher
109
110     static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
111
112     @Property
113     var SchemaSourceProvider<InputStream> remoteSourceProvider
114     
115     DataBrokerService dataBroker
116
117     public new(String name) {
118         this.name = name;
119         this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
120         this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
121             Collections.singletonMap(INVENTORY_ID, name)).toInstance;
122     }
123
124     def start() {
125         checkState(dispatcher != null, "Dispatcher must be set.");
126         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
127         checkState(eventExecutor != null, "Event executor must be set.");
128
129         val listener = new NetconfDeviceListener(this);
130         val task = startClientTask(dispatcher, listener)
131         return processingExecutor.submit(task) as Future<Void>;
132
133     }
134
135     def Optional<SchemaContext> getSchemaContext() {
136         if (deviceContextProvider == null) {
137             return Optional.absent();
138         }
139         return deviceContextProvider.currentContext;
140     }
141
142     private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
143         return [ |
144             try {
145                 logger.info("Starting Netconf Client on: {}", socketAddress);
146                 client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
147                 logger.debug("Initial capabilities {}", initialCapabilities);
148                 var SchemaSourceProvider<String> delegate;
149                 if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) {
150                     delegate = new NetconfRemoteSchemaSourceProvider(this);
151                 }  else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) {
152                     delegate = new NetconfRemoteSchemaSourceProvider(this);
153                 } else {
154                     logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress);
155                     delegate = SchemaSourceProviders.<String>noopProvider();
156                 }
157                 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
158                 deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
159                 deviceContextProvider.createContextFromCapabilities(initialCapabilities);
160                 if (mountInstance != null && schemaContext.isPresent) {
161                     mountInstance.schemaContext = schemaContext.get();
162                     val operations = schemaContext.get().operations;
163                     for (rpc : operations) {
164                         mountInstance.addRpcImplementation(rpc.QName, this);
165                     }
166                 }
167                 updateDeviceState()
168                 if (mountInstance != null && confReaderReg == null && operReaderReg == null) {
169                     confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
170                     operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
171                     commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
172                 }
173             } catch (Exception e) {
174                 logger.error("Netconf client NOT started. ", e)
175             }
176         ]
177     }
178
179     private def updateDeviceState() {
180         val transaction = dataBroker.beginTransaction
181
182         val it = ImmutableCompositeNode.builder
183         setQName(INVENTORY_NODE)
184         addLeaf(INVENTORY_ID, name)
185         addLeaf(INVENTORY_CONNECTED, client.clientSession.up)
186
187         logger.debug("Client capabilities {}", client.capabilities)
188         for (capability : client.capabilities) {
189             addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
190         }
191
192         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
193         transaction.putOperationalData(path, it.toInstance)
194         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
195         val transactionStatus = transaction.commit.get;
196
197         if (transactionStatus.successful) {
198             logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.")
199         } else {
200             logger.debug("Update device state transaction " + transaction.identifier + " FAILED!")
201             logger.debug("Update device state transaction status " + transaction.status)
202         }
203     }
204
205     override readConfigurationData(InstanceIdentifier path) {
206         val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
207             wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get();
208         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
209         return data?.findNode(path) as CompositeNode;
210     }
211
212     override readOperationalData(InstanceIdentifier path) {
213         val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get();
214         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
215         return data?.findNode(path) as CompositeNode;
216     }
217
218     override getSupportedRpcs() {
219         Collections.emptySet;
220     }
221
222 //    def createSubscription(String streamName) {
223 //        val it = ImmutableCompositeNode.builder()
224 //        QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
225 //        addLeaf("stream", streamName);
226 //        invokeRpc(QName, toInstance())
227 //    }
228
229     override invokeRpc(QName rpc, CompositeNode input) {
230         try {
231             val message = rpc.toRpcMessage(input,schemaContext);
232             val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
233             return Futures.immediateFuture(result.toRpcResult(rpc, schemaContext));
234         } catch (Exception e) {
235             logger.error("Rpc was not processed correctly.", e)
236             throw e;
237         }
238     }
239
240     def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) {
241         logger.debug("Send message {}",XmlUtil.toString(message.document))
242         val result = client.sendMessage(message, retryCount, timeout);
243         NetconfMapping.checkValidReply(message, result)
244         return result;
245     }
246
247     override getProviderFunctionality() {
248         Collections.emptySet
249     }
250
251     override onSessionInitiated(ProviderSession session) {
252         dataBroker = session.getService(DataBrokerService);
253
254         val transaction = dataBroker.beginTransaction
255         if (transaction.operationalNodeNotExisting) {
256             transaction.putOperationalData(path, nodeWithId)
257         }
258         if (transaction.configurationNodeNotExisting) {
259             transaction.putConfigurationData(path, nodeWithId)
260         }
261         transaction.commit().get();
262         mountService = session.getService(MountProvisionService);
263         mountInstance = mountService?.createOrGetMountPoint(path);
264     }
265
266     def getNodeWithId() {
267         val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
268         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
269     }
270
271     def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
272         return null === transaction.readConfigurationData(path);
273     }
274
275     def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
276         return null === transaction.readOperationalData(path);
277     }
278
279     static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
280
281         var Node<?> current = node;
282         for (arg : identifier.path) {
283             if (current instanceof SimpleNode<?>) {
284                 return null;
285             } else if (current instanceof CompositeNode) {
286                 val currentComposite = (current as CompositeNode);
287                 
288                 current = currentComposite.getFirstCompositeByName(arg.nodeType);
289                 if(current == null) {
290                     current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
291                 }
292                 if(current == null) {
293                     current = currentComposite.getFirstSimpleByName(arg.nodeType);
294                 }
295                 if (current == null) {
296                     current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
297                 } if (current == null) {
298                     return null;
299                 }
300             }
301         }
302         return current;
303     }
304
305     override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
306         val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification);
307         twoPhaseCommit.prepare()
308         return twoPhaseCommit;
309     }
310
311     def getInitialCapabilities() {
312         val capabilities = client?.capabilities;
313         if (capabilities == null) {
314             return null;
315         }
316         if (cachedCapabilities == null) {
317             cachedCapabilities = FluentIterable.from(capabilities).filter[
318                 contains("?") && contains("module=") && contains("revision=")].transform [
319                 val parts = split("\\?");
320                 val namespace = parts.get(0);
321                 val queryParams = FluentIterable.from(parts.get(1).split("&"));
322                 var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
323                 val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
324                 if (revision === null) {
325                     logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
326                     revision = queryParams.findFirst[startsWith("&amp;revision=")]?.replaceAll("revision=", "");
327                     if (revision != null) {
328                         logger.warn("Netconf device returned revision incorectly escaped for {}", it)
329                     }
330                 }
331                 if (revision == null) {
332                     return QName.create(URI.create(namespace), null, moduleName);
333                 }
334                 return QName.create(namespace, revision, moduleName);
335             ].toSet();
336         }
337         return cachedCapabilities;
338     }
339
340     override close() {
341         confReaderReg?.close()
342         operReaderReg?.close()
343         client?.close()
344     }
345 }
346
347 package class NetconfDeviceSchemaContextProvider {
348
349     @Property
350     val NetconfDevice device;
351
352     @Property
353     val SchemaSourceProvider<InputStream> sourceProvider;
354
355     @Property
356     var Optional<SchemaContext> currentContext;
357
358     new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
359         _device = device
360         _sourceProvider = sourceProvider
361         _currentContext = Optional.absent();
362     }
363
364     def createContextFromCapabilities(Iterable<QName> capabilities) {
365         val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
366         if (!sourceContext.missingSources.empty) {
367             device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
368         }
369         device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
370         val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
371         if (!sourceContext.validSources.empty) {
372             val schemaContext = tryToCreateContext(modelsToParse);
373             currentContext = Optional.fromNullable(schemaContext);
374         } else {
375             currentContext = Optional.absent();
376         }
377         if (currentContext.present) {
378             device.logger.debug("Schema context successfully created.");
379         }
380
381     }
382
383     def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
384         val parser = new YangParserImpl();
385         try {
386
387             val models = parser.parseYangModelsFromStreams(modelsToParse);
388             val result = parser.resolveSchemaContext(models);
389             return result;
390         } catch (Exception e) {
391             device.logger.debug("Error occured during parsing YANG schemas", e);
392             return null;
393         }
394     }
395 }