Added extended ietf-netconf-monitoring detection for Netconf devices
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.xtend
1 package org.opendaylight.controller.sal.connect.netconf
2
3 import com.google.common.base.Optional
4 import com.google.common.collect.FluentIterable
5 import io.netty.util.concurrent.EventExecutor
6 import java.io.InputStream
7 import java.net.InetSocketAddress
8 import java.net.URI
9 import java.util.Collections
10 import java.util.List
11 import java.util.Set
12 import java.util.concurrent.ExecutorService
13 import java.util.concurrent.Future
14 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
15 import org.opendaylight.controller.md.sal.common.api.data.DataModification
16 import org.opendaylight.controller.md.sal.common.api.data.DataReader
17 import org.opendaylight.controller.netconf.api.NetconfMessage
18 import org.opendaylight.controller.netconf.client.NetconfClient
19 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
20 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
21 import org.opendaylight.controller.sal.core.api.Provider
22 import org.opendaylight.controller.sal.core.api.RpcImplementation
23 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
24 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
25 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
26 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
27 import org.opendaylight.protocol.framework.ReconnectStrategy
28 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState
29 import org.opendaylight.yangtools.concepts.Registration
30 import org.opendaylight.yangtools.yang.common.QName
31 import org.opendaylight.yangtools.yang.data.api.CompositeNode
32 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
33 import org.opendaylight.yangtools.yang.data.api.Node
34 import org.opendaylight.yangtools.yang.data.api.SimpleNode
35 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
36 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
37 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext
39 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
40 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
41 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
42 import org.opendaylight.yangtools.yang.model.util.repo.SourceIdentifier
43 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
44 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
45 import org.slf4j.Logger
46 import org.slf4j.LoggerFactory
47
48 import static com.google.common.base.Preconditions.*
49 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
50
51 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
52 import org.opendaylight.controller.netconf.util.xml.XmlUtil
53
54 class NetconfDevice implements Provider, // 
55 DataReader<InstanceIdentifier, CompositeNode>, //
56 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
57 RpcImplementation, //
58 AutoCloseable {
59
60     var NetconfClient client;
61
62     @Property
63     var InetSocketAddress socketAddress;
64
65     @Property
66     var MountProvisionInstance mountInstance;
67
68     @Property
69     var EventExecutor eventExecutor;
70
71     @Property
72     var ExecutorService processingExecutor;
73
74     @Property
75     var InstanceIdentifier path;
76
77     @Property
78     var ReconnectStrategy reconnectStrategy;
79
80     @Property
81     var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
82
83     @Property
84     private NetconfDeviceSchemaContextProvider deviceContextProvider
85
86     protected val Logger logger
87
88     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
89     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
90     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
91
92     val String name
93     MountProvisionService mountService
94
95     int messegeRetryCount = 5;
96
97     int messageTimeoutCount = 5 * 1000;
98
99     Set<QName> cachedCapabilities
100
101     @Property
102     var NetconfClientDispatcher dispatcher
103
104     static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
105
106     @Property
107     var SchemaSourceProvider<InputStream> remoteSourceProvider
108
109     public new(String name) {
110         this.name = name;
111         this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
112         this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
113             Collections.singletonMap(INVENTORY_ID, name)).toInstance;
114     }
115
116     def start() {
117         checkState(dispatcher != null, "Dispatcher must be set.");
118         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
119         checkState(eventExecutor != null, "Event executor must be set.");
120
121         val listener = new NetconfDeviceListener(this, eventExecutor);
122         val task = startClientTask(dispatcher, listener)
123         if (mountInstance != null) {
124             confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
125             operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
126             commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this)
127         }
128         return processingExecutor.submit(task) as Future<Void>;
129
130     //commitHandlerReg = mountInstance.registerCommitHandler(path,this);
131     }
132
133     def Optional<SchemaContext> getSchemaContext() {
134         if (deviceContextProvider == null) {
135             return Optional.absent();
136         }
137         return deviceContextProvider.currentContext;
138     }
139
140     private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
141
142         return [ |
143             logger.info("Starting Netconf Client on: {}", socketAddress);
144             client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
145             logger.debug("Initial capabilities {}", initialCapabilities);
146             var SchemaSourceProvider<String> delegate;
147             if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) {
148                 delegate = new NetconfRemoteSchemaSourceProvider(this);
149             }  else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) {
150                 delegate = new NetconfRemoteSchemaSourceProvider(this);
151             } else {
152                 logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress);
153                 delegate = SchemaSourceProviders.<String>noopProvider();
154             }
155             remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
156             deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
157             deviceContextProvider.createContextFromCapabilities(initialCapabilities);
158             if (mountInstance != null && schemaContext.isPresent) {
159                 mountInstance.schemaContext = schemaContext.get();
160             }
161         ]
162     }
163
164     override readConfigurationData(InstanceIdentifier path) {
165         val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
166             wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure()));
167         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
168         return data?.findNode(path) as CompositeNode;
169     }
170
171     override readOperationalData(InstanceIdentifier path) {
172         val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure()));
173         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
174         return data?.findNode(path) as CompositeNode;
175     }
176
177     override getSupportedRpcs() {
178         Collections.emptySet;
179     }
180
181     def createSubscription(String streamName) {
182         val it = ImmutableCompositeNode.builder()
183         QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
184         addLeaf("stream", streamName);
185         invokeRpc(QName, toInstance())
186     }
187
188     override invokeRpc(QName rpc, CompositeNode input) {
189         try {
190             val message = rpc.toRpcMessage(input,schemaContext);
191             val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
192             return result.toRpcResult(rpc, schemaContext);
193
194         } catch (Exception e) {
195             logger.error("Rpc was not processed correctly.", e)
196             throw e;
197         }
198     }
199
200     def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) {
201         logger.debug("Send message {}",XmlUtil.toString(message.document))
202         val result = client.sendMessage(message, retryCount, timeout);
203         NetconfMapping.checkValidReply(message, result)
204         return result;
205     }
206
207     override getProviderFunctionality() {
208         Collections.emptySet
209     }
210
211     override onSessionInitiated(ProviderSession session) {
212         val dataBroker = session.getService(DataBrokerService);
213
214         val transaction = dataBroker.beginTransaction
215         if (transaction.operationalNodeNotExisting) {
216             transaction.putOperationalData(path, nodeWithId)
217         }
218         if (transaction.configurationNodeNotExisting) {
219             transaction.putConfigurationData(path, nodeWithId)
220         }
221         transaction.commit().get();
222         mountService = session.getService(MountProvisionService);
223         mountInstance = mountService?.createOrGetMountPoint(path);
224     }
225
226     def getNodeWithId() {
227         val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
228         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
229     }
230
231     def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
232         return null === transaction.readConfigurationData(path);
233     }
234
235     def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
236         return null === transaction.readOperationalData(path);
237     }
238
239     static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
240
241         var Node<?> current = node;
242         for (arg : identifier.path) {
243             if (current instanceof SimpleNode<?>) {
244                 return null;
245             } else if (current instanceof CompositeNode) {
246                 val currentComposite = (current as CompositeNode);
247                 
248                 current = currentComposite.getFirstCompositeByName(arg.nodeType);
249                 if(current == null) {
250                     current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
251                 }
252                 if(current == null) {
253                     current = currentComposite.getFirstSimpleByName(arg.nodeType);
254                 }
255                 if (current == null) {
256                     current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
257                 } if (current == null) {
258                     return null;
259                 }
260             }
261         }
262         return current;
263     }
264
265     override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
266         val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification);
267         twoPhaseCommit.prepare()
268         return twoPhaseCommit;
269     }
270
271     def getInitialCapabilities() {
272         val capabilities = client?.capabilities;
273         if (capabilities == null) {
274             return null;
275         }
276         if (cachedCapabilities == null) {
277             cachedCapabilities = FluentIterable.from(capabilities).filter[
278                 contains("?") && contains("module=") && contains("revision=")].transform [
279                 val parts = split("\\?");
280                 val namespace = parts.get(0);
281                 val queryParams = FluentIterable.from(parts.get(1).split("&"));
282                 var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
283                 val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
284                 if (revision === null) {
285                     logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
286                     revision = queryParams.findFirst[startsWith("&amp;revision=")]?.replaceAll("revision=", "");
287                     if (revision != null) {
288                         logger.warn("Netconf device returned revision incorectly escaped for {}", it)
289                     }
290                 }
291                 if (revision == null) {
292                     return QName.create(URI.create(namespace), null, moduleName);
293                 }
294                 return QName.create(namespace, revision, moduleName);
295             ].toSet();
296         }
297         return cachedCapabilities;
298     }
299
300     override close() {
301         confReaderReg?.close()
302         operReaderReg?.close()
303         client?.close()
304     }
305
306 }
307
308 package class NetconfDeviceSchemaContextProvider {
309
310     @Property
311     val NetconfDevice device;
312
313     @Property
314     val SchemaSourceProvider<InputStream> sourceProvider;
315
316     @Property
317     var Optional<SchemaContext> currentContext;
318
319     new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
320         _device = device
321         _sourceProvider = sourceProvider
322         _currentContext = Optional.absent();
323     }
324
325     def createContextFromCapabilities(Iterable<QName> capabilities) {
326         val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
327         if (!sourceContext.missingSources.empty) {
328             device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
329         }
330         device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
331         val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
332         if (!sourceContext.validSources.empty) {
333             val schemaContext = tryToCreateContext(modelsToParse);
334             currentContext = Optional.fromNullable(schemaContext);
335         } else {
336             currentContext = Optional.absent();
337         }
338         if (currentContext.present) {
339             device.logger.debug("Schema context successfully created.");
340         }
341
342     }
343
344     def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
345         val parser = new YangParserImpl();
346         try {
347
348             val models = parser.parseYangModelsFromStreams(modelsToParse);
349             val result = parser.resolveSchemaContext(models);
350             return result;
351         } catch (Exception e) {
352             device.logger.debug("Error occured during parsing YANG schemas", e);
353             return null;
354         }
355     }
356 }