Updated implementation of Netconf, fixed DOM Mountpoint
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.xtend
1 package org.opendaylight.controller.sal.connect.netconf
2
3 import com.google.common.base.Optional
4 import com.google.common.collect.FluentIterable
5 import io.netty.util.concurrent.EventExecutor
6 import java.io.InputStream
7 import java.net.InetSocketAddress
8 import java.net.URI
9 import java.util.Collections
10 import java.util.List
11 import java.util.Set
12 import java.util.concurrent.ExecutorService
13 import java.util.concurrent.Future
14 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
15 import org.opendaylight.controller.md.sal.common.api.data.DataModification
16 import org.opendaylight.controller.md.sal.common.api.data.DataReader
17 import org.opendaylight.controller.netconf.api.NetconfMessage
18 import org.opendaylight.controller.netconf.client.NetconfClient
19 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
20 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
21 import org.opendaylight.controller.sal.core.api.Provider
22 import org.opendaylight.controller.sal.core.api.RpcImplementation
23 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
24 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
25 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
26 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
27 import org.opendaylight.protocol.framework.ReconnectStrategy
28 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState
29 import org.opendaylight.yangtools.concepts.Registration
30 import org.opendaylight.yangtools.yang.common.QName
31 import org.opendaylight.yangtools.yang.data.api.CompositeNode
32 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
33 import org.opendaylight.yangtools.yang.data.api.Node
34 import org.opendaylight.yangtools.yang.data.api.SimpleNode
35 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
36 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
37 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext
39 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
40 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
41 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
42 import org.opendaylight.yangtools.yang.model.util.repo.SourceIdentifier
43 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
44 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
45 import org.slf4j.Logger
46 import org.slf4j.LoggerFactory
47
48 import static com.google.common.base.Preconditions.*
49 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
50
51 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
52 import org.opendaylight.controller.netconf.util.xml.XmlUtil
53
54 class NetconfDevice implements Provider, // 
55 DataReader<InstanceIdentifier, CompositeNode>, //
56 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
57 RpcImplementation, //
58 AutoCloseable {
59
60     var NetconfClient client;
61
62     @Property
63     var InetSocketAddress socketAddress;
64
65     @Property
66     var MountProvisionInstance mountInstance;
67
68     @Property
69     var EventExecutor eventExecutor;
70
71     @Property
72     var ExecutorService processingExecutor;
73
74     @Property
75     var InstanceIdentifier path;
76
77     @Property
78     var ReconnectStrategy reconnectStrategy;
79
80     @Property
81     var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
82
83     @Property
84     private NetconfDeviceSchemaContextProvider deviceContextProvider
85
86     protected val Logger logger
87
88     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
89     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
90     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
91
92     val String name
93     MountProvisionService mountService
94
95     int messegeRetryCount = 5;
96
97     int messageTimeoutCount = 5 * 1000;
98
99     Set<QName> cachedCapabilities
100
101     @Property
102     var NetconfClientDispatcher dispatcher
103
104     static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
105
106     @Property
107     var SchemaSourceProvider<InputStream> remoteSourceProvider
108
109     public new(String name) {
110         this.name = name;
111         this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
112         this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
113             Collections.singletonMap(INVENTORY_ID, name)).toInstance;
114     }
115
116     def start() {
117         checkState(dispatcher != null, "Dispatcher must be set.");
118         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
119         checkState(eventExecutor != null, "Event executor must be set.");
120
121         val listener = new NetconfDeviceListener(this, eventExecutor);
122         val task = startClientTask(dispatcher, listener)
123         if (mountInstance != null) {
124             confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
125             operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
126             commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this)
127         }
128         return processingExecutor.submit(task) as Future<Void>;
129
130     //commitHandlerReg = mountInstance.registerCommitHandler(path,this);
131     }
132
133     def Optional<SchemaContext> getSchemaContext() {
134         if (deviceContextProvider == null) {
135             return Optional.absent();
136         }
137         return deviceContextProvider.currentContext;
138     }
139
140     private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
141
142         return [ |
143             logger.info("Starting Netconf Client on: {}", socketAddress);
144             client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
145             logger.debug("Initial capabilities {}", initialCapabilities);
146             var SchemaSourceProvider<String> delegate;
147             if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) {
148                 delegate = new NetconfRemoteSchemaSourceProvider(this);
149             }  else {
150                 logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress);
151                 delegate = SchemaSourceProviders.<String>noopProvider();
152             }
153             remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
154             deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
155             deviceContextProvider.createContextFromCapabilities(initialCapabilities);
156             if (mountInstance != null && schemaContext.isPresent) {
157                 mountInstance.schemaContext = schemaContext.get();
158             }
159         ]
160     }
161
162     override readConfigurationData(InstanceIdentifier path) {
163         val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
164             wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure()));
165         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
166         return data?.findNode(path) as CompositeNode;
167     }
168
169     override readOperationalData(InstanceIdentifier path) {
170         val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure()));
171         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
172         return data?.findNode(path) as CompositeNode;
173     }
174
175     override getSupportedRpcs() {
176         Collections.emptySet;
177     }
178
179     def createSubscription(String streamName) {
180         val it = ImmutableCompositeNode.builder()
181         QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
182         addLeaf("stream", streamName);
183         invokeRpc(QName, toInstance())
184     }
185
186     override invokeRpc(QName rpc, CompositeNode input) {
187         try {
188             val message = rpc.toRpcMessage(input,schemaContext);
189             val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
190             return result.toRpcResult(rpc, schemaContext);
191
192         } catch (Exception e) {
193             logger.error("Rpc was not processed correctly.", e)
194             throw e;
195         }
196     }
197
198     def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) {
199         logger.debug("Send message {}",XmlUtil.toString(message.document))
200         val result = client.sendMessage(message, retryCount, timeout);
201         NetconfMapping.checkValidReply(message, result)
202         return result;
203     }
204
205     override getProviderFunctionality() {
206         Collections.emptySet
207     }
208
209     override onSessionInitiated(ProviderSession session) {
210         val dataBroker = session.getService(DataBrokerService);
211
212         val transaction = dataBroker.beginTransaction
213         if (transaction.operationalNodeNotExisting) {
214             transaction.putOperationalData(path, nodeWithId)
215         }
216         if (transaction.configurationNodeNotExisting) {
217             transaction.putConfigurationData(path, nodeWithId)
218         }
219         transaction.commit().get();
220         mountService = session.getService(MountProvisionService);
221         mountInstance = mountService?.createOrGetMountPoint(path);
222     }
223
224     def getNodeWithId() {
225         val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
226         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
227     }
228
229     def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
230         return null === transaction.readConfigurationData(path);
231     }
232
233     def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
234         return null === transaction.readOperationalData(path);
235     }
236
237     static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
238
239         var Node<?> current = node;
240         for (arg : identifier.path) {
241             if (current instanceof SimpleNode<?>) {
242                 return null;
243             } else if (current instanceof CompositeNode) {
244                 val currentComposite = (current as CompositeNode);
245                 
246                 current = currentComposite.getFirstCompositeByName(arg.nodeType);
247                 if(current == null) {
248                     current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
249                 }
250                 if(current == null) {
251                     current = currentComposite.getFirstSimpleByName(arg.nodeType);
252                 }
253                 if (current == null) {
254                     current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
255                 } if (current == null) {
256                     return null;
257                 }
258             }
259         }
260         return current;
261     }
262
263     override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
264         val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification);
265         twoPhaseCommit.prepare()
266         return twoPhaseCommit;
267     }
268
269     def getInitialCapabilities() {
270         val capabilities = client?.capabilities;
271         if (capabilities == null) {
272             return null;
273         }
274         if (cachedCapabilities == null) {
275             cachedCapabilities = FluentIterable.from(capabilities).filter[
276                 contains("?") && contains("module=") && contains("revision=")].transform [
277                 val parts = split("\\?");
278                 val namespace = parts.get(0);
279                 val queryParams = FluentIterable.from(parts.get(1).split("&"));
280                 var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
281                 val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
282                 if (revision === null) {
283                     logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
284                     revision = queryParams.findFirst[startsWith("&amp;revision=")]?.replaceAll("revision=", "");
285                     if (revision != null) {
286                         logger.warn("Netconf device returned revision incorectly escaped for {}", it)
287                     }
288                 }
289                 if (revision == null) {
290                     return QName.create(URI.create(namespace), null, moduleName);
291                 }
292                 return QName.create(namespace, revision, moduleName);
293             ].toSet();
294         }
295         return cachedCapabilities;
296     }
297
298     override close() {
299         confReaderReg?.close()
300         operReaderReg?.close()
301         client?.close()
302     }
303
304 }
305
306 package class NetconfDeviceSchemaContextProvider {
307
308     @Property
309     val NetconfDevice device;
310
311     @Property
312     val SchemaSourceProvider<InputStream> sourceProvider;
313
314     @Property
315     var Optional<SchemaContext> currentContext;
316
317     new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
318         _device = device
319         _sourceProvider = sourceProvider
320     }
321
322     def createContextFromCapabilities(Iterable<QName> capabilities) {
323         val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
324         if (!sourceContext.missingSources.empty) {
325             device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
326         }
327         device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
328         val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
329         if (!sourceContext.validSources.empty) {
330             val schemaContext = tryToCreateContext(modelsToParse);
331             currentContext = Optional.fromNullable(schemaContext);
332         } else {
333             currentContext = Optional.absent();
334         }
335         if (currentContext.present) {
336             device.logger.debug("Schema context successfully created.");
337         }
338
339     }
340
341     def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
342         val parser = new YangParserImpl();
343         try {
344
345             val models = parser.parseYangModelsFromStreams(modelsToParse);
346             val result = parser.resolveSchemaContext(models);
347             return result;
348         } catch (Exception e) {
349             device.logger.debug("Error occured during parsing YANG schemas", e);
350             return null;
351         }
352     }
353 }