X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDevice.xtend;h=bfe352ad41322cf78404426a5582afc22a4e074d;hb=f46e0c0ac627304099c17783696bc0b5c9f622e1;hp=7c4bf5facad6a3dd94b0f1bd6a73e301a7820143;hpb=923d6b33391ace11b42e36a7f2d08cabe241a4f3;p=controller.git diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend index 7c4bf5faca..bfe352ad41 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend @@ -25,13 +25,43 @@ import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl import org.opendaylight.protocol.framework.ReconnectStrategy import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler import org.opendaylight.controller.md.sal.common.api.data.DataModification +import com.google.common.collect.FluentIterable +import org.opendaylight.yangtools.yang.model.api.SchemaContext +import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState +import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl +import java.io.InputStream +import org.slf4j.LoggerFactory +import org.slf4j.Logger +import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener +import org.opendaylight.controller.netconf.client.NetconfClientSession +import org.opendaylight.controller.netconf.api.NetconfMessage +import io.netty.util.concurrent.EventExecutor -class NetconfDevice implements - Provider, // - DataReader, // - DataCommitHandler, // - RpcImplementation, // - AutoCloseable { +import java.util.Map +import java.util.Set +import com.google.common.collect.ImmutableMap + +import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider +import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider +import com.google.common.base.Optional +import com.google.common.collect.ImmutableList +import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders +import static com.google.common.base.Preconditions.*; +import java.util.concurrent.ExecutorService +import java.util.concurrent.Future +import org.opendaylight.controller.netconf.client.NetconfClientSessionListener +import io.netty.util.concurrent.Promise +import org.opendaylight.controller.netconf.util.xml.XmlElement +import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants +import java.util.concurrent.ExecutionException +import java.util.concurrent.locks.ReentrantLock + +class NetconfDevice implements Provider, // +DataReader, // +DataCommitHandler, // +RpcImplementation, // +AutoCloseable { var NetconfClient client; @@ -41,35 +71,97 @@ class NetconfDevice implements @Property var MountProvisionInstance mountInstance; + @Property + var EventExecutor eventExecutor; + + @Property + var ExecutorService processingExecutor; + @Property var InstanceIdentifier path; @Property - var ReconnectStrategy strategy; + var ReconnectStrategy reconnectStrategy; + + @Property + var AbstractCachingSchemaSourceProvider schemaSourceProvider; + + private NetconfDeviceSchemaContextProvider schemaContextProvider + + protected val Logger logger Registration> operReaderReg Registration> confReaderReg Registration> commitHandlerReg - + val String name MountProvisionService mountService + + int messegeRetryCount = 5; + + int messageTimeoutCount = 5 * 1000; + + Set cachedCapabilities + + @Property + var NetconfClientDispatcher dispatcher - + static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance(); + public new(String name) { this.name = name; + this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name); this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE, Collections.singletonMap(INVENTORY_ID, name)).toInstance; } - def start(NetconfClientDispatcher dispatcher) { - client = NetconfClient.clientFor(name, socketAddress, strategy, dispatcher); - confReaderReg = mountInstance.registerConfigurationReader(path, this); - operReaderReg = mountInstance.registerOperationalReader(path, this); - //commitHandlerReg = mountInstance.registerCommitHandler(path,this); + def start() { + checkState(dispatcher != null, "Dispatcher must be set."); + checkState(schemaSourceProvider != null, "Schema Source Provider must be set.") + checkState(eventExecutor != null, "Event executor must be set."); + + val listener = new NetconfDeviceListener(this,eventExecutor); + val task = startClientTask(dispatcher, listener) + if(mountInstance != null) { + confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this); + operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this); + } + return processingExecutor.submit(task) as Future; + + //commitHandlerReg = mountInstance.registerCommitHandler(path,this); + } + + def Optional getSchemaContext() { + if (schemaContextProvider == null) { + return Optional.absent(); + } + return schemaContextProvider.currentContext; + } + + private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) { + return [ | + logger.info("Starting Netconf Client on: {}", socketAddress); + client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener); + logger.debug("Initial capabilities {}", initialCapabilities); + var SchemaSourceProvider delegate; + if (initialCapabilities.contains(NetconfMapping.IETF_NETCONF_MONITORING_MODULE)) { + delegate = new NetconfDeviceSchemaSourceProvider(this); + } else { + logger.info("Device does not support IETF Netconf Monitoring.", socketAddress); + delegate = SchemaSourceProviders.noopProvider(); + } + val sourceProvider = schemaSourceProvider.createInstanceFor(delegate); + schemaContextProvider = new NetconfDeviceSchemaContextProvider(this, sourceProvider); + schemaContextProvider.createContextFromCapabilities(initialCapabilities); + if (mountInstance != null && schemaContext.isPresent) { + mountInstance.schemaContext = schemaContext.get(); + } + ] } override readConfigurationData(InstanceIdentifier path) { - val result = invokeRpc(NETCONF_GET_CONFIG_QNAME, wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())); + val result = invokeRpc(NETCONF_GET_CONFIG_QNAME, + wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())); val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME); return data?.findNode(path) as CompositeNode; } @@ -83,10 +175,17 @@ class NetconfDevice implements override getSupportedRpcs() { Collections.emptySet; } + + def createSubscription(String streamName) { + val it = ImmutableCompositeNode.builder() + QName = NETCONF_CREATE_SUBSCRIPTION_QNAME + addLeaf("stream",streamName); + invokeRpc(QName,toInstance()) + } override invokeRpc(QName rpc, CompositeNode input) { val message = rpc.toRpcMessage(input); - val result = client.sendMessage(message); + val result = client.sendMessage(message, messegeRetryCount, messageTimeoutCount); return result.toRpcResult(); } @@ -96,30 +195,28 @@ class NetconfDevice implements override onSessionInitiated(ProviderSession session) { val dataBroker = session.getService(DataBrokerService); - - - + val transaction = dataBroker.beginTransaction - if(transaction.operationalNodeNotExisting) { - transaction.putOperationalData(path,nodeWithId) + if (transaction.operationalNodeNotExisting) { + transaction.putOperationalData(path, nodeWithId) } - if(transaction.configurationNodeNotExisting) { - transaction.putConfigurationData(path,nodeWithId) + if (transaction.configurationNodeNotExisting) { + transaction.putConfigurationData(path, nodeWithId) } transaction.commit().get(); mountService = session.getService(MountProvisionService); - mountInstance = mountService.createOrGetMountPoint(path); + mountInstance = mountService?.createOrGetMountPoint(path); } - + def getNodeWithId() { - val id = new SimpleNodeTOImpl(INVENTORY_ID,null,name); - return new CompositeNodeTOImpl(INVENTORY_NODE,null,Collections.singletonList(id)); + val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name); + return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id)); } - + def boolean configurationNodeNotExisting(DataModificationTransaction transaction) { return null === transaction.readConfigurationData(path); } - + def boolean operationalNodeNotExisting(DataModificationTransaction transaction) { return null === transaction.readOperationalData(path); } @@ -133,9 +230,9 @@ class NetconfDevice implements } else if (current instanceof CompositeNode) { val currentComposite = (current as CompositeNode); - current = currentComposite.getFirstCompositeByName(arg.nodeType); + current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision()); if (current == null) { - current = currentComposite.getFirstSimpleByName(arg.nodeType); + current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision()); } if (current == null) { return null; @@ -149,6 +246,25 @@ class NetconfDevice implements throw new UnsupportedOperationException("TODO: auto-generated method stub") } + def getInitialCapabilities() { + val capabilities = client?.capabilities; + if (capabilities == null) { + return null; + } + if (cachedCapabilities == null) { + cachedCapabilities = FluentIterable.from(capabilities).filter[ + contains("?") && contains("module=") && contains("revision=")].transform [ + val parts = split("\\?"); + val namespace = parts.get(0); + val queryParams = FluentIterable.from(parts.get(1).split("&")); + val revision = queryParams.findFirst[startsWith("revision=")].replaceAll("revision=", ""); + val moduleName = queryParams.findFirst[startsWith("module=")].replaceAll("module=", ""); + return QName.create(namespace, revision, moduleName); + ].toSet(); + } + return cachedCapabilities; + } + override close() { confReaderReg?.close() operReaderReg?.close() @@ -156,3 +272,165 @@ class NetconfDevice implements } } + +package class NetconfDeviceListener extends NetconfClientSessionListener { + + val NetconfDevice device + val EventExecutor eventExecutor + + new(NetconfDevice device,EventExecutor eventExecutor) { + this.device = device + this.eventExecutor = eventExecutor + } + + var Promise messagePromise; + val promiseLock = new ReentrantLock; + + override onMessage(NetconfClientSession session, NetconfMessage message) { + if (isNotification(message)) { + onNotification(session, message); + } else try { + promiseLock.lock + if (messagePromise != null) { + messagePromise.setSuccess(message); + messagePromise = null; + } + } finally { + promiseLock.unlock + } + } + + /** + * Method intended to customize notification processing. + * + * @param session + * {@see + * NetconfClientSessionListener#onMessage(NetconfClientSession, + * NetconfMessage)} + * @param message + * {@see + * NetconfClientSessionListener#onMessage(NetconfClientSession, + * NetconfMessage)} + */ + def void onNotification(NetconfClientSession session, NetconfMessage message) { + device.logger.debug("Received NETCONF notification.",message); + val domNotification = message?.toCompositeNode?.notificationBody; + if(domNotification != null) { + device?.mountInstance?.publish(domNotification); + } + } + + private static def CompositeNode getNotificationBody(CompositeNode node) { + for(child : node.children) { + if(child instanceof CompositeNode) { + return child as CompositeNode; + } + } + } + + override getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException { + val promise = promiseReply(); + val messageAvailable = promise.await(attempts + attemptMsDelay); + if (messageAvailable) { + try { + return promise.get(); + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } + } + + throw new IllegalStateException("Unsuccessful after " + attempts + " attempts."); + + // throw new TimeoutException("Message was not received on time."); + } + + def Promise promiseReply() { + promiseLock.lock + try { + if (messagePromise == null) { + messagePromise = eventExecutor.newPromise(); + return messagePromise; + } + return messagePromise; + } finally { + promiseLock.unlock + } + } + + def boolean isNotification(NetconfMessage message) { + val xmle = XmlElement.fromDomDocument(message.getDocument()); + return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()); + } +} + +package class NetconfDeviceSchemaContextProvider { + + @Property + val NetconfDevice device; + + @Property + val SchemaSourceProvider sourceProvider; + + @Property + var Optional currentContext; + + new(NetconfDevice device, SchemaSourceProvider sourceProvider) { + _device = device + _sourceProvider = sourceProvider + } + + def createContextFromCapabilities(Iterable capabilities) { + + val modelsToParse = ImmutableMap.builder(); + for (cap : capabilities) { + val source = sourceProvider.getSchemaSource(cap.localName, Optional.fromNullable(cap.formattedRevision)); + if (source.present) { + modelsToParse.put(cap, source.get()); + } + } + val context = tryToCreateContext(modelsToParse.build); + currentContext = Optional.fromNullable(context); + } + + def SchemaContext tryToCreateContext(Map modelsToParse) { + val parser = new YangParserImpl(); + try { + val models = parser.parseYangModelsFromStreams(ImmutableList.copyOf(modelsToParse.values)); + val result = parser.resolveSchemaContext(models); + return result; + } catch (Exception e) { + device.logger.debug("Error occured during parsing YANG schemas", e); + return null; + } + } +} + +package class NetconfDeviceSchemaSourceProvider implements SchemaSourceProvider { + + val NetconfDevice device; + + new(NetconfDevice device) { + this.device = device; + } + + override getSchemaSource(String moduleName, Optional revision) { + val it = ImmutableCompositeNode.builder() // + setQName(QName::create(NetconfState.QNAME, "get-schema")) // + addLeaf("format", "yang") + addLeaf("identifier", moduleName) + if (revision.present) { + addLeaf("version", revision.get()) + } + + device.logger.info("Loading YANG schema source for {}:{}", moduleName, revision) + val schemaReply = device.invokeRpc(getQName(), toInstance()); + + if (schemaReply.successful) { + val schemaBody = schemaReply.result.getFirstSimpleByName( + QName::create(NetconfState.QNAME.namespace, null, "data"))?.value; + device.logger.info("YANG Schema successfully received for: {}:{}", moduleName, revision); + return Optional.of(schemaBody as String); + } + return Optional.absent(); + } +}