Merge "BUG-509: add missing copyright headers"
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.xtend
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf
9
10 import com.google.common.base.Optional
11 import com.google.common.collect.FluentIterable
12 import io.netty.util.concurrent.EventExecutor
13 import java.io.InputStream
14 import java.net.InetSocketAddress
15 import java.net.URI
16 import java.util.ArrayList
17 import java.util.Collection
18 import java.util.Collections
19 import java.util.List
20 import java.util.Set
21 import java.util.concurrent.ExecutorService
22 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
23 import org.opendaylight.controller.md.sal.common.api.data.DataModification
24 import org.opendaylight.controller.md.sal.common.api.data.DataReader
25 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
26 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
27 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
28 import org.opendaylight.controller.sal.core.api.Provider
29 import org.opendaylight.controller.sal.core.api.RpcImplementation
30 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
31 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
32 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
33 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
34 import org.opendaylight.protocol.framework.ReconnectStrategy
35 import org.opendaylight.yangtools.concepts.Registration
36 import org.opendaylight.yangtools.yang.common.QName
37 import org.opendaylight.yangtools.yang.data.api.CompositeNode
38 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
39 import org.opendaylight.yangtools.yang.data.api.Node
40 import org.opendaylight.yangtools.yang.data.api.SimpleNode
41 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
42 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
43 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext
45 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
46 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
47 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
48 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
49 import org.slf4j.Logger
50 import org.slf4j.LoggerFactory
51
52 import static com.google.common.base.Preconditions.*
53 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
54
55 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
56
57 class NetconfDevice implements Provider, //
58 DataReader<InstanceIdentifier, CompositeNode>, //
59 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
60 RpcImplementation, //
61 AutoCloseable {
62
63     @Property
64     var InetSocketAddress socketAddress;
65
66     @Property
67     var MountProvisionInstance mountInstance;
68
69     @Property
70     var EventExecutor eventExecutor;
71
72     @Property
73     var ExecutorService processingExecutor;
74
75     @Property
76     var InstanceIdentifier path;
77
78     @Property
79     var ReconnectStrategy reconnectStrategy;
80
81     @Property
82     var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
83
84     @Property
85     private NetconfDeviceSchemaContextProvider deviceContextProvider
86
87     protected val Logger logger
88
89     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
90     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
91     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
92     List<RpcRegistration> rpcReg
93
94     @Property
95     val String name
96
97     MountProvisionService mountService
98
99     @Property
100     var NetconfClientDispatcher dispatcher
101
102     static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
103
104     @Property
105     var SchemaSourceProvider<InputStream> remoteSourceProvider
106
107     DataBrokerService dataBroker
108
109     var NetconfDeviceListener listener;
110
111     public new(String name) {
112         this._name = name;
113         this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
114         this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
115             Collections.singletonMap(INVENTORY_ID, name)).toInstance;
116     }
117
118     def start() {
119         checkState(dispatcher != null, "Dispatcher must be set.");
120         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
121         checkState(eventExecutor != null, "Event executor must be set.");
122
123         listener = new NetconfDeviceListener(this);
124
125         logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
126
127         dispatcher.createClient(socketAddress, listener, reconnectStrategy);
128     }
129
130     def Optional<SchemaContext> getSchemaContext() {
131         if (deviceContextProvider == null) {
132             return Optional.absent();
133         }
134         return deviceContextProvider.currentContext;
135     }
136
137     def bringDown() {
138         if (rpcReg != null) {
139             for (reg : rpcReg) {
140                 reg.close()
141             }
142             rpcReg = null
143         }
144         confReaderReg?.close()
145         confReaderReg = null
146         operReaderReg?.close()
147         operReaderReg = null
148         commitHandlerReg?.close()
149         commitHandlerReg = null
150
151         updateDeviceState(false, Collections.emptySet())
152     }
153
154     def bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
155         remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
156         deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
157         deviceContextProvider.createContextFromCapabilities(capabilities);
158         if (mountInstance != null && schemaContext.isPresent) {
159             mountInstance.schemaContext = schemaContext.get();
160         }
161
162         updateDeviceState(true, capabilities)
163
164         if (mountInstance != null) {
165             confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
166             operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
167             commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
168
169             val rpcs = new ArrayList<RpcRegistration>();
170             for (rpc : mountInstance.schemaContext.operations) {
171                 rpcs.add(mountInstance.addRpcImplementation(rpc.QName, this));
172             }
173             rpcReg = rpcs
174         }
175     }
176
177     private def updateDeviceState(boolean up, Set<QName> capabilities) {
178         val transaction = dataBroker.beginTransaction
179
180         val it = ImmutableCompositeNode.builder
181         setQName(INVENTORY_NODE)
182         addLeaf(INVENTORY_ID, name)
183         addLeaf(INVENTORY_CONNECTED, up)
184
185         logger.debug("Client capabilities {}", capabilities)
186         for (capability : capabilities) {
187             addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
188         }
189
190         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
191         transaction.removeOperationalData(path)
192         transaction.putOperationalData(path, it.toInstance)
193         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
194
195         // FIXME: this has to be asynchronous
196         val transactionStatus = transaction.commit.get;
197
198         if (transactionStatus.successful) {
199             logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.")
200         } else {
201             logger.debug("Update device state transaction " + transaction.identifier + " FAILED!")
202             logger.debug("Update device state transaction status " + transaction.status)
203         }
204     }
205
206     override readConfigurationData(InstanceIdentifier path) {
207         val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
208             wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get();
209         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
210         return data?.findNode(path) as CompositeNode;
211     }
212
213     override readOperationalData(InstanceIdentifier path) {
214         val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get();
215         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
216         return data?.findNode(path) as CompositeNode;
217     }
218
219     override getSupportedRpcs() {
220         Collections.emptySet;
221     }
222
223     override invokeRpc(QName rpc, CompositeNode input) {
224         return listener.sendRequest(rpc.toRpcMessage(input,schemaContext));
225     }
226
227     override getProviderFunctionality() {
228         Collections.emptySet
229     }
230
231     override onSessionInitiated(ProviderSession session) {
232         dataBroker = session.getService(DataBrokerService);
233
234         val transaction = dataBroker.beginTransaction
235         if (transaction.operationalNodeNotExisting) {
236             transaction.putOperationalData(path, nodeWithId)
237         }
238         if (transaction.configurationNodeNotExisting) {
239             transaction.putConfigurationData(path, nodeWithId)
240         }
241         transaction.commit().get();
242         mountService = session.getService(MountProvisionService);
243         mountInstance = mountService?.createOrGetMountPoint(path);
244     }
245
246     def getNodeWithId() {
247         val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
248         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
249     }
250
251     def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
252         return null === transaction.readConfigurationData(path);
253     }
254
255     def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
256         return null === transaction.readOperationalData(path);
257     }
258
259     static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
260
261         var Node<?> current = node;
262         for (arg : identifier.path) {
263             if (current instanceof SimpleNode<?>) {
264                 return null;
265             } else if (current instanceof CompositeNode) {
266                 val currentComposite = (current as CompositeNode);
267
268                 current = currentComposite.getFirstCompositeByName(arg.nodeType);
269                 if(current == null) {
270                     current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
271                 }
272                 if(current == null) {
273                     current = currentComposite.getFirstSimpleByName(arg.nodeType);
274                 }
275                 if (current == null) {
276                     current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
277                 } if (current == null) {
278                     return null;
279                 }
280             }
281         }
282         return current;
283     }
284
285     override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
286         val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification, true);
287         twoPhaseCommit.prepare()
288         return twoPhaseCommit;
289     }
290
291     def getCapabilities(Collection<String> capabilities) {
292         return FluentIterable.from(capabilities).filter[
293                 contains("?") && contains("module=") && contains("revision=")].transform [
294                 val parts = split("\\?");
295                 val namespace = parts.get(0);
296                 val queryParams = FluentIterable.from(parts.get(1).split("&"));
297                 var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
298                 val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
299                 if (revision === null) {
300                     logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
301                     revision = queryParams.findFirst[startsWith("&amp;revision=")]?.replaceAll("revision=", "");
302                     if (revision != null) {
303                         logger.warn("Netconf device returned revision incorectly escaped for {}", it)
304                     }
305                 }
306                 if (revision == null) {
307                     return QName.create(URI.create(namespace), null, moduleName);
308                 }
309                 return QName.create(namespace, revision, moduleName);
310             ].toSet();
311     }
312
313     override close() {
314         bringDown()
315     }
316 }
317
318 package class NetconfDeviceSchemaContextProvider {
319
320     @Property
321     val NetconfDevice device;
322
323     @Property
324     val SchemaSourceProvider<InputStream> sourceProvider;
325
326     @Property
327     var Optional<SchemaContext> currentContext;
328
329     new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
330         _device = device
331         _sourceProvider = sourceProvider
332         _currentContext = Optional.absent();
333     }
334
335     def createContextFromCapabilities(Iterable<QName> capabilities) {
336         val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
337         if (!sourceContext.missingSources.empty) {
338             device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
339         }
340         device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
341         val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
342         if (!sourceContext.validSources.empty) {
343             val schemaContext = tryToCreateContext(modelsToParse);
344             currentContext = Optional.fromNullable(schemaContext);
345         } else {
346             currentContext = Optional.absent();
347         }
348         if (currentContext.present) {
349             device.logger.debug("Schema context successfully created.");
350         }
351
352     }
353
354     def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
355         val parser = new YangParserImpl();
356         try {
357
358             val models = parser.parseYangModelsFromStreams(modelsToParse);
359             val result = parser.resolveSchemaContext(models);
360             return result;
361         } catch (Exception e) {
362             device.logger.debug("Error occured during parsing YANG schemas", e);
363             return null;
364         }
365     }
366 }