2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connect.netconf;
10 import static com.google.common.base.Preconditions.checkState;
11 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED;
12 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID;
13 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE;
14 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH;
15 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY;
16 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING;
17 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME;
18 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME;
19 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME;
20 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure;
21 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
22 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
24 import java.io.InputStream;
25 import java.net.InetSocketAddress;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.List;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.ExecutorService;
36 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
37 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
38 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
39 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
40 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
41 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
42 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
43 import org.opendaylight.controller.sal.core.api.Provider;
44 import org.opendaylight.controller.sal.core.api.RpcImplementation;
45 import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
46 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
47 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
48 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
49 import org.opendaylight.protocol.framework.ReconnectStrategy;
50 import org.opendaylight.yangtools.concepts.Registration;
51 import org.opendaylight.yangtools.yang.common.QName;
52 import org.opendaylight.yangtools.yang.common.RpcResult;
53 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
54 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.Node;
56 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
57 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
58 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
59 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
60 import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
61 import org.opendaylight.yangtools.yang.model.api.Module;
62 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
63 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
64 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
65 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
66 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
67 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
71 import com.google.common.base.Function;
72 import com.google.common.base.Optional;
73 import com.google.common.base.Predicate;
74 import com.google.common.collect.FluentIterable;
75 import com.google.common.collect.Iterables;
76 import com.google.common.util.concurrent.ListenableFuture;
77 import io.netty.util.concurrent.EventExecutor;
79 public class NetconfDevice implements Provider, //
80 DataReader<InstanceIdentifier, CompositeNode>, //
81 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
85 InetSocketAddress socketAddress;
87 MountProvisionInstance mountInstance;
89 EventExecutor eventExecutor;
91 ExecutorService processingExecutor;
93 InstanceIdentifier path;
95 ReconnectStrategy reconnectStrategy;
97 AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
99 private NetconfDeviceSchemaContextProvider deviceContextProvider;
101 protected Logger logger;
103 Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
104 Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
105 Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
106 List<RpcRegistration> rpcReg;
110 MountProvisionService mountService;
112 NetconfClientDispatcher dispatcher;
114 static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
116 SchemaSourceProvider<InputStream> remoteSourceProvider;
118 DataBrokerService dataBroker;
120 NetconfDeviceListener listener;
122 public NetconfDevice(String name) {
124 this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
125 this.path = InstanceIdentifier.builder(INVENTORY_PATH)
126 .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
129 public void start() {
130 checkState(dispatcher != null, "Dispatcher must be set.");
131 checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
132 checkState(eventExecutor != null, "Event executor must be set.");
134 listener = new NetconfDeviceListener(this);
136 logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
138 dispatcher.createClient(socketAddress, listener, reconnectStrategy);
141 Optional<SchemaContext> getSchemaContext() {
142 if (deviceContextProvider == null) {
143 return Optional.absent();
145 return deviceContextProvider.currentContext;
149 if (rpcReg != null) {
150 for (RpcRegistration reg : rpcReg) {
155 closeGracefully(confReaderReg);
156 confReaderReg = null;
157 closeGracefully(operReaderReg);
158 operReaderReg = null;
159 closeGracefully(commitHandlerReg);
160 commitHandlerReg = null;
162 updateDeviceState(false, Collections.<QName> emptySet());
165 private void closeGracefully(final AutoCloseable resource) {
166 if (resource != null) {
169 } catch (Exception e) {
170 logger.warn("Ignoring exception while closing {}", resource, e);
175 void bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
176 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
177 deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
178 deviceContextProvider.createContextFromCapabilities(capabilities);
179 if (mountInstance != null && getSchemaContext().isPresent()) {
180 mountInstance.setSchemaContext(getSchemaContext().get());
183 updateDeviceState(true, capabilities);
185 if (mountInstance != null) {
186 confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
187 operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
188 commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
190 List<RpcRegistration> rpcs = new ArrayList<>();
191 // TODO same condition twice
192 if (mountInstance != null && getSchemaContext().isPresent()) {
193 for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
194 rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), this));
201 private void updateDeviceState(boolean up, Set<QName> capabilities) {
202 DataModificationTransaction transaction = dataBroker.beginTransaction();
204 CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
205 it.setQName(INVENTORY_NODE);
206 it.addLeaf(INVENTORY_ID, name);
207 it.addLeaf(INVENTORY_CONNECTED, up);
209 logger.debug("Client capabilities {}", capabilities);
210 for (QName capability : capabilities) {
211 it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability);
214 logger.debug("Update device state transaction " + transaction.getIdentifier()
215 + " putting operational data started.");
216 transaction.removeOperationalData(path);
217 transaction.putOperationalData(path, it.toInstance());
218 logger.debug("Update device state transaction " + transaction.getIdentifier()
219 + " putting operational data ended.");
221 // FIXME: this has to be asynchronous
222 RpcResult<TransactionStatus> transactionStatus = null;
224 transactionStatus = transaction.commit().get();
225 } catch (InterruptedException e) {
226 throw new RuntimeException("Interrupted while waiting for response", e);
227 } catch (ExecutionException e) {
228 throw new RuntimeException("Read configuration data " + path + " failed", e);
230 // TODO better ex handling
232 if (transactionStatus.isSuccessful()) {
233 logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
235 logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
236 logger.debug("Update device state transaction status " + transaction.getStatus());
241 public CompositeNode readConfigurationData(InstanceIdentifier path) {
242 RpcResult<CompositeNode> result = null;
244 result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
245 wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
246 } catch (InterruptedException e) {
247 throw new RuntimeException("Interrupted while waiting for response", e);
248 } catch (ExecutionException e) {
249 throw new RuntimeException("Read configuration data " + path + " failed", e);
252 CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
253 return data == null ? null : (CompositeNode) findNode(data, path);
257 public CompositeNode readOperationalData(InstanceIdentifier path) {
258 RpcResult<CompositeNode> result = null;
260 result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
261 } catch (InterruptedException e) {
262 throw new RuntimeException("Interrupted while waiting for response", e);
263 } catch (ExecutionException e) {
264 throw new RuntimeException("Read configuration data " + path + " failed", e);
267 CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
268 return (CompositeNode) findNode(data, path);
272 public Set<QName> getSupportedRpcs() {
273 return Collections.emptySet();
277 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
278 return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()));
282 public Collection<ProviderFunctionality> getProviderFunctionality() {
283 return Collections.emptySet();
287 public void onSessionInitiated(ProviderSession session) {
288 dataBroker = session.getService(DataBrokerService.class);
290 DataModificationTransaction transaction = dataBroker.beginTransaction();
291 if (operationalNodeNotExisting(transaction)) {
292 transaction.putOperationalData(path, getNodeWithId());
294 if (configurationNodeNotExisting(transaction)) {
295 transaction.putConfigurationData(path, getNodeWithId());
299 transaction.commit().get();
300 } catch (InterruptedException e) {
301 throw new RuntimeException("Interrupted while waiting for response", e);
302 } catch (ExecutionException e) {
303 throw new RuntimeException("Read configuration data " + path + " failed", e);
306 mountService = session.getService(MountProvisionService.class);
307 if (mountService != null) {
308 mountInstance = mountService.createOrGetMountPoint(path);
312 CompositeNode getNodeWithId() {
313 SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
314 return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
317 boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
318 return null == transaction.readConfigurationData(path);
321 boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
322 return null == transaction.readOperationalData(path);
325 static Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
327 Node<?> current = node;
328 for (InstanceIdentifier.PathArgument arg : identifier.getPath()) {
329 if (current instanceof SimpleNode<?>) {
331 } else if (current instanceof CompositeNode) {
332 CompositeNode currentComposite = (CompositeNode) current;
334 current = currentComposite.getFirstCompositeByName(arg.getNodeType());
335 if (current == null) {
336 current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
338 if (current == null) {
339 current = currentComposite.getFirstSimpleByName(arg.getNodeType());
341 if (current == null) {
342 current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
344 if (current == null) {
353 public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
354 DataModification<InstanceIdentifier, CompositeNode> modification) {
355 NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
358 twoPhaseCommit.prepare();
359 } catch (InterruptedException e) {
360 throw new RuntimeException("Interrupted while waiting for response", e);
361 } catch (ExecutionException e) {
362 throw new RuntimeException("Read configuration data " + path + " failed", e);
364 return twoPhaseCommit;
367 Set<QName> getCapabilities(Collection<String> capabilities) {
368 return FluentIterable.from(capabilities).filter(new Predicate<String>() {
370 public boolean apply(final String capability) {
371 return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
373 }).transform(new Function<String, QName>() {
375 public QName apply(final String capability) {
376 String[] parts = capability.split("\\?");
377 String namespace = parts[0];
378 FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
380 String revision = getStringAndTransform(queryParams, "revision=", "revision=");
382 String moduleName = getStringAndTransform(queryParams, "module=", "module=");
384 if (revision == null) {
385 logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
386 revision = getStringAndTransform(queryParams, "amp;revision==", "revision=");
388 if (revision != null) {
389 logger.warn("Netconf device returned revision incorectly escaped for {}", capability);
392 if (revision == null) {
393 return QName.create(URI.create(namespace), null, moduleName);
395 return QName.create(namespace, revision, moduleName);
398 private String getStringAndTransform(final Iterable<String> queryParams, final String match,
399 final String substringToRemove) {
400 Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
402 public boolean apply(final String input) {
403 return input.startsWith(match);
407 return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
414 public void close() {
418 public String getName() {
422 public InetSocketAddress getSocketAddress() {
423 return socketAddress;
426 public MountProvisionInstance getMountInstance() {
427 return mountInstance;
430 public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) {
431 this.reconnectStrategy = reconnectStrategy;
434 public void setProcessingExecutor(final ExecutorService processingExecutor) {
435 this.processingExecutor = processingExecutor;
438 public void setSocketAddress(final InetSocketAddress socketAddress) {
439 this.socketAddress = socketAddress;
442 public void setEventExecutor(final EventExecutor eventExecutor) {
443 this.eventExecutor = eventExecutor;
446 public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider) {
447 this.schemaSourceProvider = schemaSourceProvider;
450 public void setDispatcher(final NetconfClientDispatcher dispatcher) {
451 this.dispatcher = dispatcher;
455 class NetconfDeviceSchemaContextProvider {
457 NetconfDevice device;
459 SchemaSourceProvider<InputStream> sourceProvider;
461 Optional<SchemaContext> currentContext;
463 NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
464 this.device = device;
465 this.sourceProvider = sourceProvider;
466 this.currentContext = Optional.absent();
469 void createContextFromCapabilities(Iterable<QName> capabilities) {
470 YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
471 if (!sourceContext.getMissingSources().isEmpty()) {
472 device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources());
474 device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources());
475 List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
476 if (!sourceContext.getValidSources().isEmpty()) {
477 SchemaContext schemaContext = tryToCreateContext(modelsToParse);
478 currentContext = Optional.fromNullable(schemaContext);
480 currentContext = Optional.absent();
482 if (currentContext.isPresent()) {
483 device.logger.debug("Schema context successfully created.");
487 SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
488 YangParserImpl parser = new YangParserImpl();
491 Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
492 return parser.resolveSchemaContext(models);
493 } catch (Exception e) {
494 device.logger.debug("Error occured during parsing YANG schemas", e);