Bug 1073: Implemented Transaction chain on InMemoryDOMDataStore level.
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED;
12 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID;
13 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE;
14 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH;
15 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY;
16 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING;
17 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME;
18 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME;
19 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME;
20 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure;
21 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
22 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
23
24 import com.google.common.base.Preconditions;
25 import java.io.InputStream;
26 import java.net.InetSocketAddress;
27 import java.net.URI;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.Set;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.ExecutorService;
36
37 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
38 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
39 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
40 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
41 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
42 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
43 import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration;
44 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
45 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
46 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
47 import org.opendaylight.controller.sal.core.api.Provider;
48 import org.opendaylight.controller.sal.core.api.RpcImplementation;
49 import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
50 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
51 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
52 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
53 import org.opendaylight.protocol.framework.ReconnectStrategy;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
56 import org.opendaylight.yangtools.concepts.Registration;
57 import org.opendaylight.yangtools.yang.common.QName;
58 import org.opendaylight.yangtools.yang.common.RpcResult;
59 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
60 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
61 import org.opendaylight.yangtools.yang.data.api.Node;
62 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
63 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
64 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
65 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
66 import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
67 import org.opendaylight.yangtools.yang.model.api.Module;
68 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
69 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
70 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
71 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
72 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
73 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 import com.google.common.base.Function;
78 import com.google.common.base.Optional;
79 import com.google.common.base.Predicate;
80 import com.google.common.collect.FluentIterable;
81 import com.google.common.collect.Iterables;
82 import com.google.common.util.concurrent.ListenableFuture;
83 import io.netty.util.concurrent.EventExecutor;
84
85 public class NetconfDevice implements Provider, //
86         DataReader<InstanceIdentifier, CompositeNode>, //
87         DataCommitHandler<InstanceIdentifier, CompositeNode>, //
88         RpcImplementation, //
89         AutoCloseable {
90
91     InetSocketAddress socketAddress;
92
93     MountProvisionInstance mountInstance;
94
95     EventExecutor eventExecutor;
96
97     ExecutorService processingExecutor;
98
99     InstanceIdentifier path;
100
101     ReconnectStrategy reconnectStrategy;
102
103     AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
104
105     private NetconfDeviceSchemaContextProvider deviceContextProvider;
106
107     protected Logger logger;
108
109     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
110     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
111     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
112     List<RpcRegistration> rpcReg;
113
114     String name;
115
116     MountProvisionService mountService;
117
118     NetconfClientDispatcher dispatcher;
119
120     static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
121
122     SchemaSourceProvider<InputStream> remoteSourceProvider;
123
124     private volatile DataBrokerService dataBroker;
125
126     NetconfDeviceListener listener;
127
128     private boolean rollbackSupported;
129
130     private NetconfReconnectingClientConfiguration clientConfig;
131     private volatile DataProviderService dataProviderService;
132
133     public NetconfDevice(String name) {
134         this.name = name;
135         this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
136         this.path = InstanceIdentifier.builder(INVENTORY_PATH)
137                 .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
138     }
139
140     public void start() {
141         checkState(dispatcher != null, "Dispatcher must be set.");
142         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
143         checkState(eventExecutor != null, "Event executor must be set.");
144
145         Preconditions.checkArgument(clientConfig.getSessionListener() instanceof NetconfDeviceListener);
146         listener = (NetconfDeviceListener) clientConfig.getSessionListener();
147
148         logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
149
150         dispatcher.createReconnectingClient(clientConfig);
151     }
152
153     Optional<SchemaContext> getSchemaContext() {
154         if (deviceContextProvider == null) {
155             return Optional.absent();
156         }
157         return deviceContextProvider.currentContext;
158     }
159
160     void bringDown() {
161         if (rpcReg != null) {
162             for (RpcRegistration reg : rpcReg) {
163                 reg.close();
164             }
165             rpcReg = null;
166         }
167         closeGracefully(confReaderReg);
168         confReaderReg = null;
169         closeGracefully(operReaderReg);
170         operReaderReg = null;
171         closeGracefully(commitHandlerReg);
172         commitHandlerReg = null;
173
174         updateDeviceState(false, Collections.<QName> emptySet());
175     }
176
177     private void closeGracefully(final AutoCloseable resource) {
178         if (resource != null) {
179             try {
180                 resource.close();
181             } catch (Exception e) {
182                 logger.warn("Ignoring exception while closing {}", resource, e);
183             }
184         }
185     }
186
187     void bringUp(final SchemaSourceProvider<String> delegate, final Set<QName> capabilities, final boolean rollbackSupported) {
188         // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener.
189         // Reason: delegate.getSchema blocks thread when waiting for response
190         // however, if the netty thread is blocked, no incoming message can be processed
191         // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html
192         // TODO redesign +refactor
193         processingExecutor.submit(new Runnable() {
194             @Override
195             public void run() {
196                 NetconfDevice.this.rollbackSupported = rollbackSupported;
197                 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
198                 deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider);
199                 deviceContextProvider.createContextFromCapabilities(capabilities);
200                 if (mountInstance != null && getSchemaContext().isPresent()) {
201                     mountInstance.setSchemaContext(getSchemaContext().get());
202                 }
203
204                 updateDeviceState(true, capabilities);
205
206                 if (mountInstance != null) {
207                     confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this);
208                     operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this);
209                     commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this);
210
211                     List<RpcRegistration> rpcs = new ArrayList<>();
212                     // TODO same condition twice
213                     if (mountInstance != null && getSchemaContext().isPresent()) {
214                         for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
215                             rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this));
216                         }
217                     }
218                     rpcReg = rpcs;
219                 }
220             }
221         });
222     }
223
224     private void updateDeviceState(boolean up, Set<QName> capabilities) {
225         checkDataStoreState();
226
227         DataModificationTransaction transaction = dataBroker.beginTransaction();
228
229         CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
230         it.setQName(INVENTORY_NODE);
231         it.addLeaf(INVENTORY_ID, name);
232         it.addLeaf(INVENTORY_CONNECTED, up);
233
234         logger.debug("Client capabilities {}", capabilities);
235         for (QName capability : capabilities) {
236             it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability.toString());
237         }
238
239         logger.debug("Update device state transaction " + transaction.getIdentifier()
240                 + " putting operational data started.");
241         transaction.removeOperationalData(path);
242         transaction.putOperationalData(path, it.toInstance());
243         logger.debug("Update device state transaction " + transaction.getIdentifier()
244                 + " putting operational data ended.");
245
246         // FIXME: this has to be asynchronous
247         RpcResult<TransactionStatus> transactionStatus = null;
248         try {
249             transactionStatus = transaction.commit().get();
250         } catch (InterruptedException e) {
251             throw new RuntimeException("Interrupted while waiting for response", e);
252         } catch (ExecutionException e) {
253             throw new RuntimeException("Read configuration data " + path + " failed", e);
254         }
255         // TODO better ex handling
256
257         if (transactionStatus.isSuccessful()) {
258             logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
259         } else {
260             logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
261             logger.debug("Update device state transaction status " + transaction.getStatus());
262         }
263     }
264
265     @Override
266     public CompositeNode readConfigurationData(InstanceIdentifier path) {
267         RpcResult<CompositeNode> result = null;
268         try {
269             result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
270                     wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
271         } catch (InterruptedException e) {
272             throw new RuntimeException("Interrupted while waiting for response", e);
273         } catch (ExecutionException e) {
274             throw new RuntimeException("Read configuration data " + path + " failed", e);
275         }
276
277         CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
278         return data == null ? null : (CompositeNode) findNode(data, path);
279     }
280
281     @Override
282     public CompositeNode readOperationalData(InstanceIdentifier path) {
283         RpcResult<CompositeNode> result = null;
284         try {
285             result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
286         } catch (InterruptedException e) {
287             throw new RuntimeException("Interrupted while waiting for response", e);
288         } catch (ExecutionException e) {
289             throw new RuntimeException("Read configuration data " + path + " failed", e);
290         }
291
292         CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
293         return (CompositeNode) findNode(data, path);
294     }
295
296     @Override
297     public Set<QName> getSupportedRpcs() {
298         return Collections.emptySet();
299     }
300
301     @Override
302     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
303         return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc);
304     }
305
306     @Override
307     public Collection<ProviderFunctionality> getProviderFunctionality() {
308         return Collections.emptySet();
309     }
310
311     @Override
312     public void onSessionInitiated(ProviderSession session) {
313         dataBroker = session.getService(DataBrokerService.class);
314
315         processingExecutor.submit(new Runnable() {
316             @Override
317             public void run() {
318                 updateInitialState();
319             }
320         });
321
322         mountService = session.getService(MountProvisionService.class);
323         if (mountService != null) {
324             mountInstance = mountService.createOrGetMountPoint(path);
325         }
326     }
327
328     private void updateInitialState() {
329         checkDataStoreState();
330
331         DataModificationTransaction transaction = dataBroker.beginTransaction();
332         if (operationalNodeNotExisting(transaction)) {
333             transaction.putOperationalData(path, getNodeWithId());
334         }
335         if (configurationNodeNotExisting(transaction)) {
336             transaction.putConfigurationData(path, getNodeWithId());
337         }
338
339         try {
340             transaction.commit().get();
341         } catch (InterruptedException e) {
342             throw new RuntimeException("Interrupted while waiting for response", e);
343         } catch (ExecutionException e) {
344             throw new RuntimeException("Read configuration data " + path + " failed", e);
345         }
346     }
347
348     private void checkDataStoreState() {
349         // read data from Nodes/Node in order to wait with write until schema for Nodes/Node is present in datastore
350         dataProviderService.readOperationalData(org.opendaylight.yangtools.yang.binding.InstanceIdentifier.builder(
351                 Nodes.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class).augmentation(NetconfNode.class).build());    }
352
353     CompositeNode getNodeWithId() {
354         SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
355         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
356     }
357
358     boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
359         return null == transaction.readConfigurationData(path);
360     }
361
362     boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
363         return null == transaction.readOperationalData(path);
364     }
365
366     static Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
367
368         Node<?> current = node;
369         for (InstanceIdentifier.PathArgument arg : identifier.getPath()) {
370             if (current instanceof SimpleNode<?>) {
371                 return null;
372             } else if (current instanceof CompositeNode) {
373                 CompositeNode currentComposite = (CompositeNode) current;
374
375                 current = currentComposite.getFirstCompositeByName(arg.getNodeType());
376                 if (current == null) {
377                     current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
378                 }
379                 if (current == null) {
380                     current = currentComposite.getFirstSimpleByName(arg.getNodeType());
381                 }
382                 if (current == null) {
383                     current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
384                 }
385                 if (current == null) {
386                     return null;
387                 }
388             }
389         }
390         return current;
391     }
392
393     @Override
394     public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
395             DataModification<InstanceIdentifier, CompositeNode> modification) {
396         NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
397                 modification, true, rollbackSupported);
398         try {
399             twoPhaseCommit.prepare();
400         } catch (InterruptedException e) {
401             throw new RuntimeException("Interrupted while waiting for response", e);
402         } catch (ExecutionException e) {
403             throw new RuntimeException("Read configuration data " + path + " failed", e);
404         }
405          return twoPhaseCommit;
406     }
407
408     Set<QName> getCapabilities(Collection<String> capabilities) {
409         return FluentIterable.from(capabilities).filter(new Predicate<String>() {
410             @Override
411             public boolean apply(final String capability) {
412                 return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
413             }
414         }).transform(new Function<String, QName>() {
415             @Override
416             public QName apply(final String capability) {
417                 String[] parts = capability.split("\\?");
418                 String namespace = parts[0];
419                 FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
420
421                 String revision = getStringAndTransform(queryParams, "revision=", "revision=");
422
423                 String moduleName = getStringAndTransform(queryParams, "module=", "module=");
424
425                 if (revision == null) {
426                     logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
427                     revision = getStringAndTransform(queryParams, "amp;revision==", "revision=");
428
429                     if (revision != null) {
430                         logger.warn("Netconf device returned revision incorectly escaped for {}", capability);
431                     }
432                 }
433                 if (revision == null) {
434                     return QName.create(URI.create(namespace), null, moduleName);
435                 }
436                 return QName.create(namespace, revision, moduleName);
437             }
438
439             private String getStringAndTransform(final Iterable<String> queryParams, final String match,
440                     final String substringToRemove) {
441                 Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
442                     @Override
443                     public boolean apply(final String input) {
444                         return input.startsWith(match);
445                     }
446                 });
447
448                 return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
449             }
450
451         }).toSet();
452     }
453
454     @Override
455     public void close() {
456         bringDown();
457     }
458
459     public String getName() {
460         return name;
461     }
462
463     public InetSocketAddress getSocketAddress() {
464         return socketAddress;
465     }
466
467     public MountProvisionInstance getMountInstance() {
468         return mountInstance;
469     }
470
471     public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) {
472         this.reconnectStrategy = reconnectStrategy;
473     }
474
475     public void setProcessingExecutor(final ExecutorService processingExecutor) {
476         this.processingExecutor = processingExecutor;
477     }
478
479     public void setSocketAddress(final InetSocketAddress socketAddress) {
480         this.socketAddress = socketAddress;
481     }
482
483     public void setEventExecutor(final EventExecutor eventExecutor) {
484         this.eventExecutor = eventExecutor;
485     }
486
487     public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider) {
488         this.schemaSourceProvider = schemaSourceProvider;
489     }
490
491     public void setDispatcher(final NetconfClientDispatcher dispatcher) {
492         this.dispatcher = dispatcher;
493     }
494
495     public void setClientConfig(final NetconfReconnectingClientConfiguration clientConfig) {
496         this.clientConfig = clientConfig;
497     }
498
499     public void setDataProviderService(final DataProviderService dataProviderService) {
500         this.dataProviderService = dataProviderService;
501     }
502 }
503
504 class NetconfDeviceSchemaContextProvider {
505
506     NetconfDevice device;
507
508     SchemaSourceProvider<InputStream> sourceProvider;
509
510     Optional<SchemaContext> currentContext;
511
512     NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
513         this.device = device;
514         this.sourceProvider = sourceProvider;
515         this.currentContext = Optional.absent();
516     }
517
518     void createContextFromCapabilities(Iterable<QName> capabilities) {
519         YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
520         if (!sourceContext.getMissingSources().isEmpty()) {
521             device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources());
522         }
523         device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources());
524         List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
525         if (!sourceContext.getValidSources().isEmpty()) {
526             SchemaContext schemaContext = tryToCreateContext(modelsToParse);
527             currentContext = Optional.fromNullable(schemaContext);
528         } else {
529             currentContext = Optional.absent();
530         }
531         if (currentContext.isPresent()) {
532             device.logger.debug("Schema context successfully created.");
533         }
534     }
535
536     SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
537         YangParserImpl parser = new YangParserImpl();
538         try {
539
540             Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
541             return parser.resolveSchemaContext(models);
542         } catch (Exception e) {
543             device.logger.debug("Error occured during parsing YANG schemas", e);
544             return null;
545         }
546     }
547 }