Statistics-Manager - Performance Improvement
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED;
12 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID;
13 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE;
14 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH;
15 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY;
16 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING;
17 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME;
18 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME;
19 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME;
20 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure;
21 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
22 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
23
24 import java.io.InputStream;
25 import java.net.InetSocketAddress;
26 import java.net.URI;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.ExecutorService;
35
36 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
37 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
38 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
39 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
40 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
41 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
42 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
43 import org.opendaylight.controller.sal.core.api.Provider;
44 import org.opendaylight.controller.sal.core.api.RpcImplementation;
45 import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
46 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
47 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
48 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
49 import org.opendaylight.protocol.framework.ReconnectStrategy;
50 import org.opendaylight.yangtools.concepts.Registration;
51 import org.opendaylight.yangtools.yang.common.QName;
52 import org.opendaylight.yangtools.yang.common.RpcResult;
53 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
54 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.Node;
56 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
57 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
58 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
59 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
60 import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
61 import org.opendaylight.yangtools.yang.model.api.Module;
62 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
63 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
64 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
65 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
66 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
67 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71 import com.google.common.base.Function;
72 import com.google.common.base.Optional;
73 import com.google.common.base.Predicate;
74 import com.google.common.collect.FluentIterable;
75 import com.google.common.collect.Iterables;
76 import com.google.common.util.concurrent.ListenableFuture;
77 import io.netty.util.concurrent.EventExecutor;
78
79 public class NetconfDevice implements Provider, //
80         DataReader<InstanceIdentifier, CompositeNode>, //
81         DataCommitHandler<InstanceIdentifier, CompositeNode>, //
82         RpcImplementation, //
83         AutoCloseable {
84
85     InetSocketAddress socketAddress;
86
87     MountProvisionInstance mountInstance;
88
89     EventExecutor eventExecutor;
90
91     ExecutorService processingExecutor;
92
93     InstanceIdentifier path;
94
95     ReconnectStrategy reconnectStrategy;
96
97     AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
98
99     private NetconfDeviceSchemaContextProvider deviceContextProvider;
100
101     protected Logger logger;
102
103     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
104     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
105     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
106     List<RpcRegistration> rpcReg;
107
108     String name;
109
110     MountProvisionService mountService;
111
112     NetconfClientDispatcher dispatcher;
113
114     static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
115
116     SchemaSourceProvider<InputStream> remoteSourceProvider;
117
118     DataBrokerService dataBroker;
119
120     NetconfDeviceListener listener;
121
122     private boolean rollbackSupported;
123
124
125     public NetconfDevice(String name) {
126         this.name = name;
127         this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
128         this.path = InstanceIdentifier.builder(INVENTORY_PATH)
129                 .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
130     }
131
132     public void start() {
133         checkState(dispatcher != null, "Dispatcher must be set.");
134         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
135         checkState(eventExecutor != null, "Event executor must be set.");
136
137         listener = new NetconfDeviceListener(this);
138
139         logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
140
141         dispatcher.createClient(socketAddress, listener, reconnectStrategy);
142     }
143
144     Optional<SchemaContext> getSchemaContext() {
145         if (deviceContextProvider == null) {
146             return Optional.absent();
147         }
148         return deviceContextProvider.currentContext;
149     }
150
151     void bringDown() {
152         if (rpcReg != null) {
153             for (RpcRegistration reg : rpcReg) {
154                 reg.close();
155             }
156             rpcReg = null;
157         }
158         closeGracefully(confReaderReg);
159         confReaderReg = null;
160         closeGracefully(operReaderReg);
161         operReaderReg = null;
162         closeGracefully(commitHandlerReg);
163         commitHandlerReg = null;
164
165         updateDeviceState(false, Collections.<QName> emptySet());
166     }
167
168     private void closeGracefully(final AutoCloseable resource) {
169         if (resource != null) {
170             try {
171                 resource.close();
172             } catch (Exception e) {
173                 logger.warn("Ignoring exception while closing {}", resource, e);
174             }
175         }
176     }
177
178     void bringUp(final SchemaSourceProvider<String> delegate, final Set<QName> capabilities, final boolean rollbackSupported) {
179         // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener.
180         // Reason: delegate.getSchema blocks thread when waiting for response
181         // however, if the netty thread is blocked, no incoming message can be processed
182         // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html
183         // TODO redesign +refactor
184         processingExecutor.submit(new Runnable() {
185             @Override
186             public void run() {
187                 NetconfDevice.this.rollbackSupported = rollbackSupported;
188                 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
189                 deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider);
190                 deviceContextProvider.createContextFromCapabilities(capabilities);
191                 if (mountInstance != null && getSchemaContext().isPresent()) {
192                     mountInstance.setSchemaContext(getSchemaContext().get());
193                 }
194
195                 updateDeviceState(true, capabilities);
196
197                 if (mountInstance != null) {
198                     confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this);
199                     operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this);
200                     commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this);
201
202                     List<RpcRegistration> rpcs = new ArrayList<>();
203                     // TODO same condition twice
204                     if (mountInstance != null && getSchemaContext().isPresent()) {
205                         for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
206                             rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this));
207                         }
208                     }
209                     rpcReg = rpcs;
210                 }
211             }
212         });
213     }
214
215     private void updateDeviceState(boolean up, Set<QName> capabilities) {
216         DataModificationTransaction transaction = dataBroker.beginTransaction();
217
218         CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
219         it.setQName(INVENTORY_NODE);
220         it.addLeaf(INVENTORY_ID, name);
221         it.addLeaf(INVENTORY_CONNECTED, up);
222
223         logger.debug("Client capabilities {}", capabilities);
224         for (QName capability : capabilities) {
225             it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability);
226         }
227
228         logger.debug("Update device state transaction " + transaction.getIdentifier()
229                 + " putting operational data started.");
230         transaction.removeOperationalData(path);
231         transaction.putOperationalData(path, it.toInstance());
232         logger.debug("Update device state transaction " + transaction.getIdentifier()
233                 + " putting operational data ended.");
234
235         // FIXME: this has to be asynchronous
236         RpcResult<TransactionStatus> transactionStatus = null;
237         try {
238             transactionStatus = transaction.commit().get();
239         } catch (InterruptedException e) {
240             throw new RuntimeException("Interrupted while waiting for response", e);
241         } catch (ExecutionException e) {
242             throw new RuntimeException("Read configuration data " + path + " failed", e);
243         }
244         // TODO better ex handling
245
246         if (transactionStatus.isSuccessful()) {
247             logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
248         } else {
249             logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
250             logger.debug("Update device state transaction status " + transaction.getStatus());
251         }
252     }
253
254     @Override
255     public CompositeNode readConfigurationData(InstanceIdentifier path) {
256         RpcResult<CompositeNode> result = null;
257         try {
258             result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
259                     wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
260         } catch (InterruptedException e) {
261             throw new RuntimeException("Interrupted while waiting for response", e);
262         } catch (ExecutionException e) {
263             throw new RuntimeException("Read configuration data " + path + " failed", e);
264         }
265
266         CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
267         return data == null ? null : (CompositeNode) findNode(data, path);
268     }
269
270     @Override
271     public CompositeNode readOperationalData(InstanceIdentifier path) {
272         RpcResult<CompositeNode> result = null;
273         try {
274             result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
275         } catch (InterruptedException e) {
276             throw new RuntimeException("Interrupted while waiting for response", e);
277         } catch (ExecutionException e) {
278             throw new RuntimeException("Read configuration data " + path + " failed", e);
279         }
280
281         CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
282         return (CompositeNode) findNode(data, path);
283     }
284
285     @Override
286     public Set<QName> getSupportedRpcs() {
287         return Collections.emptySet();
288     }
289
290     @Override
291     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
292         return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()));
293     }
294
295     @Override
296     public Collection<ProviderFunctionality> getProviderFunctionality() {
297         return Collections.emptySet();
298     }
299
300     @Override
301     public void onSessionInitiated(ProviderSession session) {
302         dataBroker = session.getService(DataBrokerService.class);
303
304         DataModificationTransaction transaction = dataBroker.beginTransaction();
305         if (operationalNodeNotExisting(transaction)) {
306             transaction.putOperationalData(path, getNodeWithId());
307         }
308         if (configurationNodeNotExisting(transaction)) {
309             transaction.putConfigurationData(path, getNodeWithId());
310         }
311
312         try {
313             transaction.commit().get();
314         } catch (InterruptedException e) {
315             throw new RuntimeException("Interrupted while waiting for response", e);
316         } catch (ExecutionException e) {
317             throw new RuntimeException("Read configuration data " + path + " failed", e);
318         }
319
320         mountService = session.getService(MountProvisionService.class);
321         if (mountService != null) {
322             mountInstance = mountService.createOrGetMountPoint(path);
323         }
324     }
325
326     CompositeNode getNodeWithId() {
327         SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
328         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
329     }
330
331     boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
332         return null == transaction.readConfigurationData(path);
333     }
334
335     boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
336         return null == transaction.readOperationalData(path);
337     }
338
339     static Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
340
341         Node<?> current = node;
342         for (InstanceIdentifier.PathArgument arg : identifier.getPath()) {
343             if (current instanceof SimpleNode<?>) {
344                 return null;
345             } else if (current instanceof CompositeNode) {
346                 CompositeNode currentComposite = (CompositeNode) current;
347
348                 current = currentComposite.getFirstCompositeByName(arg.getNodeType());
349                 if (current == null) {
350                     current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
351                 }
352                 if (current == null) {
353                     current = currentComposite.getFirstSimpleByName(arg.getNodeType());
354                 }
355                 if (current == null) {
356                     current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
357                 }
358                 if (current == null) {
359                     return null;
360                 }
361             }
362         }
363         return current;
364     }
365
366     @Override
367     public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
368             DataModification<InstanceIdentifier, CompositeNode> modification) {
369         NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
370                 modification, true, rollbackSupported);
371         try {
372             twoPhaseCommit.prepare();
373         } catch (InterruptedException e) {
374             throw new RuntimeException("Interrupted while waiting for response", e);
375         } catch (ExecutionException e) {
376             throw new RuntimeException("Read configuration data " + path + " failed", e);
377         }
378          return twoPhaseCommit;
379     }
380
381     Set<QName> getCapabilities(Collection<String> capabilities) {
382         return FluentIterable.from(capabilities).filter(new Predicate<String>() {
383             @Override
384             public boolean apply(final String capability) {
385                 return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
386             }
387         }).transform(new Function<String, QName>() {
388             @Override
389             public QName apply(final String capability) {
390                 String[] parts = capability.split("\\?");
391                 String namespace = parts[0];
392                 FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
393
394                 String revision = getStringAndTransform(queryParams, "revision=", "revision=");
395
396                 String moduleName = getStringAndTransform(queryParams, "module=", "module=");
397
398                 if (revision == null) {
399                     logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
400                     revision = getStringAndTransform(queryParams, "amp;revision==", "revision=");
401
402                     if (revision != null) {
403                         logger.warn("Netconf device returned revision incorectly escaped for {}", capability);
404                     }
405                 }
406                 if (revision == null) {
407                     return QName.create(URI.create(namespace), null, moduleName);
408                 }
409                 return QName.create(namespace, revision, moduleName);
410             }
411
412             private String getStringAndTransform(final Iterable<String> queryParams, final String match,
413                     final String substringToRemove) {
414                 Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
415                     @Override
416                     public boolean apply(final String input) {
417                         return input.startsWith(match);
418                     }
419                 });
420
421                 return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
422             }
423
424         }).toSet();
425     }
426
427     @Override
428     public void close() {
429         bringDown();
430     }
431
432     public String getName() {
433         return name;
434     }
435
436     public InetSocketAddress getSocketAddress() {
437         return socketAddress;
438     }
439
440     public MountProvisionInstance getMountInstance() {
441         return mountInstance;
442     }
443
444     public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) {
445         this.reconnectStrategy = reconnectStrategy;
446     }
447
448     public void setProcessingExecutor(final ExecutorService processingExecutor) {
449         this.processingExecutor = processingExecutor;
450     }
451
452     public void setSocketAddress(final InetSocketAddress socketAddress) {
453         this.socketAddress = socketAddress;
454     }
455
456     public void setEventExecutor(final EventExecutor eventExecutor) {
457         this.eventExecutor = eventExecutor;
458     }
459
460     public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider) {
461         this.schemaSourceProvider = schemaSourceProvider;
462     }
463
464     public void setDispatcher(final NetconfClientDispatcher dispatcher) {
465         this.dispatcher = dispatcher;
466     }
467 }
468
469 class NetconfDeviceSchemaContextProvider {
470
471     NetconfDevice device;
472
473     SchemaSourceProvider<InputStream> sourceProvider;
474
475     Optional<SchemaContext> currentContext;
476
477     NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
478         this.device = device;
479         this.sourceProvider = sourceProvider;
480         this.currentContext = Optional.absent();
481     }
482
483     void createContextFromCapabilities(Iterable<QName> capabilities) {
484         YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
485         if (!sourceContext.getMissingSources().isEmpty()) {
486             device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources());
487         }
488         device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources());
489         List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
490         if (!sourceContext.getValidSources().isEmpty()) {
491             SchemaContext schemaContext = tryToCreateContext(modelsToParse);
492             currentContext = Optional.fromNullable(schemaContext);
493         } else {
494             currentContext = Optional.absent();
495         }
496         if (currentContext.isPresent()) {
497             device.logger.debug("Schema context successfully created.");
498         }
499     }
500
501     SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
502         YangParserImpl parser = new YangParserImpl();
503         try {
504
505             Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
506             return parser.resolveSchemaContext(models);
507         } catch (Exception e) {
508             device.logger.debug("Error occured during parsing YANG schemas", e);
509             return null;
510         }
511     }
512 }