Fix thread safety issues in netconf client
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.xtend
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf
9
10 import com.google.common.base.Optional
11 import com.google.common.collect.FluentIterable
12 import io.netty.util.concurrent.EventExecutor
13 import java.io.InputStream
14 import java.net.InetSocketAddress
15 import java.net.URI
16 import java.util.Collections
17 import java.util.List
18 import java.util.Set
19 import java.util.concurrent.ExecutorService
20 import java.util.concurrent.Future
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
22 import org.opendaylight.controller.md.sal.common.api.data.DataModification
23 import org.opendaylight.controller.md.sal.common.api.data.DataReader
24 import org.opendaylight.controller.netconf.api.NetconfMessage
25 import org.opendaylight.controller.netconf.client.NetconfClient
26 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
27 import org.opendaylight.controller.netconf.util.xml.XmlUtil
28 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
29 import org.opendaylight.controller.sal.core.api.Provider
30 import org.opendaylight.controller.sal.core.api.RpcImplementation
31 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
32 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
33 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
34 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
35 import org.opendaylight.protocol.framework.ReconnectStrategy
36 import org.opendaylight.yangtools.concepts.Registration
37 import org.opendaylight.yangtools.yang.common.QName
38 import org.opendaylight.yangtools.yang.data.api.CompositeNode
39 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
40 import org.opendaylight.yangtools.yang.data.api.Node
41 import org.opendaylight.yangtools.yang.data.api.SimpleNode
42 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
43 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
44 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
45 import org.opendaylight.yangtools.yang.model.api.SchemaContext
46 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
47 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
48 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
49 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
50 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
51 import org.slf4j.Logger
52 import org.slf4j.LoggerFactory
53
54 import static com.google.common.base.Preconditions.*
55 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
56
57 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
58
59 class NetconfDevice implements Provider, // 
60 DataReader<InstanceIdentifier, CompositeNode>, //
61 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
62 RpcImplementation, //
63 AutoCloseable {
64
65     var NetconfClient client;
66
67     @Property
68     var InetSocketAddress socketAddress;
69
70     @Property
71     var MountProvisionInstance mountInstance;
72
73     @Property
74     var EventExecutor eventExecutor;
75
76     @Property
77     var ExecutorService processingExecutor;
78
79     @Property
80     var InstanceIdentifier path;
81
82     @Property
83     var ReconnectStrategy reconnectStrategy;
84
85     @Property
86     var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
87
88     @Property
89     private NetconfDeviceSchemaContextProvider deviceContextProvider
90
91     protected val Logger logger
92
93     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
94     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
95     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
96
97     val String name
98     MountProvisionService mountService
99
100     int messegeRetryCount = 5;
101
102     int messageTimeoutCount = 5 * 1000;
103
104     Set<QName> cachedCapabilities
105
106     @Property
107     var NetconfClientDispatcher dispatcher
108
109     static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
110
111     @Property
112     var SchemaSourceProvider<InputStream> remoteSourceProvider
113     
114     DataBrokerService dataBroker
115
116     public new(String name) {
117         this.name = name;
118         this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
119         this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
120             Collections.singletonMap(INVENTORY_ID, name)).toInstance;
121     }
122
123     def start() {
124         checkState(dispatcher != null, "Dispatcher must be set.");
125         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
126         checkState(eventExecutor != null, "Event executor must be set.");
127
128         val listener = new NetconfDeviceListener(this);
129         val task = startClientTask(dispatcher, listener)
130         if (mountInstance != null) {
131             commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this)
132         }
133         return processingExecutor.submit(task) as Future<Void>;
134
135     //commitHandlerReg = mountInstance.registerCommitHandler(path,this);
136     }
137
138     def Optional<SchemaContext> getSchemaContext() {
139         if (deviceContextProvider == null) {
140             return Optional.absent();
141         }
142         return deviceContextProvider.currentContext;
143     }
144
145     private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
146         return [ |
147             try {
148                 logger.info("Starting Netconf Client on: {}", socketAddress);
149                 client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
150                 logger.debug("Initial capabilities {}", initialCapabilities);
151                 var SchemaSourceProvider<String> delegate;
152                 if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) {
153                     delegate = new NetconfRemoteSchemaSourceProvider(this);
154                 }  else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) {
155                     delegate = new NetconfRemoteSchemaSourceProvider(this);
156                 } else {
157                     logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress);
158                     delegate = SchemaSourceProviders.<String>noopProvider();
159                 }
160                 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
161                 deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
162                 deviceContextProvider.createContextFromCapabilities(initialCapabilities);
163                 if (mountInstance != null && schemaContext.isPresent) {
164                     mountInstance.schemaContext = schemaContext.get();
165                 }
166                 updateDeviceState()
167                 if (mountInstance != null && confReaderReg == null && operReaderReg == null) {
168                     confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
169                     operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
170                 }
171             } catch (Exception e) {
172                 logger.error("Netconf client NOT started. ", e)
173             }
174         ]
175     }
176
177     private def updateDeviceState() {
178         val transaction = dataBroker.beginTransaction
179
180         val it = ImmutableCompositeNode.builder
181         setQName(INVENTORY_NODE)
182         addLeaf(INVENTORY_ID, name)
183         addLeaf(INVENTORY_CONNECTED, client.clientSession.up)
184
185         logger.debug("Client capabilities {}", client.capabilities)
186         for (capability : client.capabilities) {
187             addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
188         }
189
190         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
191         transaction.putOperationalData(path, it.toInstance)
192         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
193         val transactionStatus = transaction.commit.get;
194
195         if (transactionStatus.successful) {
196             logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.")
197         } else {
198             logger.debug("Update device state transaction " + transaction.identifier + " FAILED!")
199             logger.debug("Update device state transaction status " + transaction.status)
200         }
201     }
202
203     override readConfigurationData(InstanceIdentifier path) {
204         val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
205             wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure()));
206         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
207         return data?.findNode(path) as CompositeNode;
208     }
209
210     override readOperationalData(InstanceIdentifier path) {
211         val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure()));
212         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
213         return data?.findNode(path) as CompositeNode;
214     }
215
216     override getSupportedRpcs() {
217         Collections.emptySet;
218     }
219
220     def createSubscription(String streamName) {
221         val it = ImmutableCompositeNode.builder()
222         QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
223         addLeaf("stream", streamName);
224         invokeRpc(QName, toInstance())
225     }
226
227     override invokeRpc(QName rpc, CompositeNode input) {
228         try {
229             val message = rpc.toRpcMessage(input,schemaContext);
230             val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
231             return result.toRpcResult(rpc, schemaContext);
232
233         } catch (Exception e) {
234             logger.error("Rpc was not processed correctly.", e)
235             throw e;
236         }
237     }
238
239     def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) {
240         logger.debug("Send message {}",XmlUtil.toString(message.document))
241         val result = client.sendMessage(message, retryCount, timeout);
242         NetconfMapping.checkValidReply(message, result)
243         return result;
244     }
245
246     override getProviderFunctionality() {
247         Collections.emptySet
248     }
249
250     override onSessionInitiated(ProviderSession session) {
251         dataBroker = session.getService(DataBrokerService);
252
253         val transaction = dataBroker.beginTransaction
254         if (transaction.operationalNodeNotExisting) {
255             transaction.putOperationalData(path, nodeWithId)
256         }
257         if (transaction.configurationNodeNotExisting) {
258             transaction.putConfigurationData(path, nodeWithId)
259         }
260         transaction.commit().get();
261         mountService = session.getService(MountProvisionService);
262         mountInstance = mountService?.createOrGetMountPoint(path);
263     }
264
265     def getNodeWithId() {
266         val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
267         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
268     }
269
270     def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
271         return null === transaction.readConfigurationData(path);
272     }
273
274     def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
275         return null === transaction.readOperationalData(path);
276     }
277
278     static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
279
280         var Node<?> current = node;
281         for (arg : identifier.path) {
282             if (current instanceof SimpleNode<?>) {
283                 return null;
284             } else if (current instanceof CompositeNode) {
285                 val currentComposite = (current as CompositeNode);
286                 
287                 current = currentComposite.getFirstCompositeByName(arg.nodeType);
288                 if(current == null) {
289                     current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
290                 }
291                 if(current == null) {
292                     current = currentComposite.getFirstSimpleByName(arg.nodeType);
293                 }
294                 if (current == null) {
295                     current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
296                 } if (current == null) {
297                     return null;
298                 }
299             }
300         }
301         return current;
302     }
303
304     override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
305         val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification);
306         twoPhaseCommit.prepare()
307         return twoPhaseCommit;
308     }
309
310     def getInitialCapabilities() {
311         val capabilities = client?.capabilities;
312         if (capabilities == null) {
313             return null;
314         }
315         if (cachedCapabilities == null) {
316             cachedCapabilities = FluentIterable.from(capabilities).filter[
317                 contains("?") && contains("module=") && contains("revision=")].transform [
318                 val parts = split("\\?");
319                 val namespace = parts.get(0);
320                 val queryParams = FluentIterable.from(parts.get(1).split("&"));
321                 var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
322                 val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
323                 if (revision === null) {
324                     logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
325                     revision = queryParams.findFirst[startsWith("&amp;revision=")]?.replaceAll("revision=", "");
326                     if (revision != null) {
327                         logger.warn("Netconf device returned revision incorectly escaped for {}", it)
328                     }
329                 }
330                 if (revision == null) {
331                     return QName.create(URI.create(namespace), null, moduleName);
332                 }
333                 return QName.create(namespace, revision, moduleName);
334             ].toSet();
335         }
336         return cachedCapabilities;
337     }
338
339     override close() {
340         confReaderReg?.close()
341         operReaderReg?.close()
342         client?.close()
343     }
344
345 }
346
347 package class NetconfDeviceSchemaContextProvider {
348
349     @Property
350     val NetconfDevice device;
351
352     @Property
353     val SchemaSourceProvider<InputStream> sourceProvider;
354
355     @Property
356     var Optional<SchemaContext> currentContext;
357
358     new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
359         _device = device
360         _sourceProvider = sourceProvider
361         _currentContext = Optional.absent();
362     }
363
364     def createContextFromCapabilities(Iterable<QName> capabilities) {
365         val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
366         if (!sourceContext.missingSources.empty) {
367             device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
368         }
369         device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
370         val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
371         if (!sourceContext.validSources.empty) {
372             val schemaContext = tryToCreateContext(modelsToParse);
373             currentContext = Optional.fromNullable(schemaContext);
374         } else {
375             currentContext = Optional.absent();
376         }
377         if (currentContext.present) {
378             device.logger.debug("Schema context successfully created.");
379         }
380
381     }
382
383     def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
384         val parser = new YangParserImpl();
385         try {
386
387             val models = parser.parseYangModelsFromStreams(modelsToParse);
388             val result = parser.resolveSchemaContext(models);
389             return result;
390         } catch (Exception e) {
391             device.logger.debug("Error occured during parsing YANG schemas", e);
392             return null;
393         }
394     }
395 }