Fixed advanced Netconf client functionality.
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.xtend
index 7c4bf5facad6a3dd94b0f1bd6a73e301a7820143..bfe352ad41322cf78404426a5582afc22a4e074d 100644 (file)
@@ -25,13 +25,43 @@ import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
 import org.opendaylight.protocol.framework.ReconnectStrategy
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
 import org.opendaylight.controller.md.sal.common.api.data.DataModification
+import com.google.common.collect.FluentIterable
+import org.opendaylight.yangtools.yang.model.api.SchemaContext
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState
+import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
+import java.io.InputStream
+import org.slf4j.LoggerFactory
+import org.slf4j.Logger
+import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener
+import org.opendaylight.controller.netconf.client.NetconfClientSession
+import org.opendaylight.controller.netconf.api.NetconfMessage
+import io.netty.util.concurrent.EventExecutor
 
-class NetconfDevice implements 
-    Provider, // 
-    DataReader<InstanceIdentifier, CompositeNode>, //
-    DataCommitHandler<InstanceIdentifier, CompositeNode>, //
-    RpcImplementation, //
-    AutoCloseable {
+import java.util.Map
+import java.util.Set
+import com.google.common.collect.ImmutableMap
+
+import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
+import com.google.common.base.Optional
+import com.google.common.collect.ImmutableList
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
+import static com.google.common.base.Preconditions.*;
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Future
+import org.opendaylight.controller.netconf.client.NetconfClientSessionListener
+import io.netty.util.concurrent.Promise
+import org.opendaylight.controller.netconf.util.xml.XmlElement
+import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.locks.ReentrantLock
+
+class NetconfDevice implements Provider, // 
+DataReader<InstanceIdentifier, CompositeNode>, //
+DataCommitHandler<InstanceIdentifier, CompositeNode>, //
+RpcImplementation, //
+AutoCloseable {
 
     var NetconfClient client;
 
@@ -41,35 +71,97 @@ class NetconfDevice implements
     @Property
     var MountProvisionInstance mountInstance;
 
+    @Property
+    var EventExecutor eventExecutor;
+
+    @Property
+    var ExecutorService processingExecutor;
+
     @Property
     var InstanceIdentifier path;
 
     @Property
-    var ReconnectStrategy strategy;
+    var ReconnectStrategy reconnectStrategy;
+
+    @Property
+    var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
+
+    private NetconfDeviceSchemaContextProvider schemaContextProvider
+
+    protected val Logger logger
 
     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
-    
+
     val String name
     MountProvisionService mountService
+
+    int messegeRetryCount = 5;
+
+    int messageTimeoutCount = 5 * 1000;
+
+    Set<QName> cachedCapabilities
+
+    @Property
+    var NetconfClientDispatcher dispatcher
     
-    
+    static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
+
     public new(String name) {
         this.name = name;
+        this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
         this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
             Collections.singletonMap(INVENTORY_ID, name)).toInstance;
     }
 
-    def start(NetconfClientDispatcher dispatcher) {
-        client = NetconfClient.clientFor(name, socketAddress, strategy, dispatcher);
-        confReaderReg = mountInstance.registerConfigurationReader(path, this);
-        operReaderReg = mountInstance.registerOperationalReader(path, this);
-        //commitHandlerReg = mountInstance.registerCommitHandler(path,this);
+    def start() {
+        checkState(dispatcher != null, "Dispatcher must be set.");
+        checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
+        checkState(eventExecutor != null, "Event executor must be set.");
+
+        val listener = new NetconfDeviceListener(this,eventExecutor);
+        val task = startClientTask(dispatcher, listener)
+        if(mountInstance != null) {
+            confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
+            operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
+        }
+        return processingExecutor.submit(task) as Future<Void>;
+
+    //commitHandlerReg = mountInstance.registerCommitHandler(path,this);
+    }
+
+    def Optional<SchemaContext> getSchemaContext() {
+        if (schemaContextProvider == null) {
+            return Optional.absent();
+        }
+        return schemaContextProvider.currentContext;
+    }
+
+    private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
+        return [ |
+            logger.info("Starting Netconf Client on: {}", socketAddress);
+            client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
+            logger.debug("Initial capabilities {}", initialCapabilities);
+            var SchemaSourceProvider<String> delegate;
+            if (initialCapabilities.contains(NetconfMapping.IETF_NETCONF_MONITORING_MODULE)) {
+                delegate = new NetconfDeviceSchemaSourceProvider(this);
+            } else {
+                logger.info("Device does not support IETF Netconf Monitoring.", socketAddress);
+                delegate = SchemaSourceProviders.<String>noopProvider();
+            }
+            val sourceProvider = schemaSourceProvider.createInstanceFor(delegate);
+            schemaContextProvider = new NetconfDeviceSchemaContextProvider(this, sourceProvider);
+            schemaContextProvider.createContextFromCapabilities(initialCapabilities);
+            if (mountInstance != null && schemaContext.isPresent) {
+                mountInstance.schemaContext = schemaContext.get();
+            }
+        ]
     }
 
     override readConfigurationData(InstanceIdentifier path) {
-        val result = invokeRpc(NETCONF_GET_CONFIG_QNAME, wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure()));
+        val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
+            wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure()));
         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
         return data?.findNode(path) as CompositeNode;
     }
@@ -83,10 +175,17 @@ class NetconfDevice implements
     override getSupportedRpcs() {
         Collections.emptySet;
     }
+    
+    def createSubscription(String streamName) {
+        val it = ImmutableCompositeNode.builder()
+        QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
+        addLeaf("stream",streamName);
+        invokeRpc(QName,toInstance())
+    }
 
     override invokeRpc(QName rpc, CompositeNode input) {
         val message = rpc.toRpcMessage(input);
-        val result = client.sendMessage(message);
+        val result = client.sendMessage(message, messegeRetryCount, messageTimeoutCount);
         return result.toRpcResult();
     }
 
@@ -96,30 +195,28 @@ class NetconfDevice implements
 
     override onSessionInitiated(ProviderSession session) {
         val dataBroker = session.getService(DataBrokerService);
-        
-        
-        
+
         val transaction = dataBroker.beginTransaction
-        if(transaction.operationalNodeNotExisting) {
-            transaction.putOperationalData(path,nodeWithId)
+        if (transaction.operationalNodeNotExisting) {
+            transaction.putOperationalData(path, nodeWithId)
         }
-        if(transaction.configurationNodeNotExisting) {
-            transaction.putConfigurationData(path,nodeWithId)
+        if (transaction.configurationNodeNotExisting) {
+            transaction.putConfigurationData(path, nodeWithId)
         }
         transaction.commit().get();
         mountService = session.getService(MountProvisionService);
-        mountInstance = mountService.createOrGetMountPoint(path);
+        mountInstance = mountService?.createOrGetMountPoint(path);
     }
-    
+
     def getNodeWithId() {
-        val id = new SimpleNodeTOImpl(INVENTORY_ID,null,name);
-        return new CompositeNodeTOImpl(INVENTORY_NODE,null,Collections.singletonList(id));
+        val id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
+        return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.singletonList(id));
     }
-    
+
     def boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
         return null === transaction.readConfigurationData(path);
     }
-    
+
     def boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
         return null === transaction.readOperationalData(path);
     }
@@ -133,9 +230,9 @@ class NetconfDevice implements
             } else if (current instanceof CompositeNode) {
                 val currentComposite = (current as CompositeNode);
 
-                current = currentComposite.getFirstCompositeByName(arg.nodeType);
+                current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
                 if (current == null) {
-                    current = currentComposite.getFirstSimpleByName(arg.nodeType);
+                    current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
                 }
                 if (current == null) {
                     return null;
@@ -149,6 +246,25 @@ class NetconfDevice implements
         throw new UnsupportedOperationException("TODO: auto-generated method stub")
     }
 
+    def getInitialCapabilities() {
+        val capabilities = client?.capabilities;
+        if (capabilities == null) {
+            return null;
+        }
+        if (cachedCapabilities == null) {
+            cachedCapabilities = FluentIterable.from(capabilities).filter[
+                contains("?") && contains("module=") && contains("revision=")].transform [
+                val parts = split("\\?");
+                val namespace = parts.get(0);
+                val queryParams = FluentIterable.from(parts.get(1).split("&"));
+                val revision = queryParams.findFirst[startsWith("revision=")].replaceAll("revision=", "");
+                val moduleName = queryParams.findFirst[startsWith("module=")].replaceAll("module=", "");
+                return QName.create(namespace, revision, moduleName);
+            ].toSet();
+        }
+        return cachedCapabilities;
+    }
+
     override close() {
         confReaderReg?.close()
         operReaderReg?.close()
@@ -156,3 +272,165 @@ class NetconfDevice implements
     }
 
 }
+
+package class NetconfDeviceListener extends NetconfClientSessionListener {
+
+    val NetconfDevice device
+    val EventExecutor eventExecutor
+
+    new(NetconfDevice device,EventExecutor eventExecutor) {
+        this.device = device
+        this.eventExecutor = eventExecutor
+    }
+
+    var Promise<NetconfMessage> messagePromise;
+    val promiseLock = new ReentrantLock;
+    
+    override onMessage(NetconfClientSession session, NetconfMessage message) {
+        if (isNotification(message)) {
+            onNotification(session, message);
+        } else try {
+            promiseLock.lock
+            if (messagePromise != null) {
+                messagePromise.setSuccess(message);
+                messagePromise = null;
+            }
+        } finally {
+            promiseLock.unlock
+        }
+    }
+
+    /**
+     * Method intended to customize notification processing.
+     * 
+     * @param session
+     *            {@see
+     *            NetconfClientSessionListener#onMessage(NetconfClientSession,
+     *            NetconfMessage)}
+     * @param message
+     *            {@see
+     *            NetconfClientSessionListener#onMessage(NetconfClientSession,
+     *            NetconfMessage)}
+     */
+    def void onNotification(NetconfClientSession session, NetconfMessage message) {
+        device.logger.debug("Received NETCONF notification.",message);
+        val domNotification = message?.toCompositeNode?.notificationBody;
+        if(domNotification != null) {
+            device?.mountInstance?.publish(domNotification);
+        }
+    }
+    
+    private static def CompositeNode getNotificationBody(CompositeNode node) {
+        for(child : node.children) {
+            if(child instanceof CompositeNode) {
+                return child as CompositeNode;
+            }
+        }
+    }
+
+    override getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException {
+        val promise = promiseReply();
+        val messageAvailable = promise.await(attempts + attemptMsDelay);
+        if (messageAvailable) {
+            try {
+                return promise.get();
+            } catch (ExecutionException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        throw new IllegalStateException("Unsuccessful after " + attempts + " attempts.");
+
+    // throw new TimeoutException("Message was not received on time.");
+    }
+
+    def Promise<NetconfMessage> promiseReply() {
+        promiseLock.lock
+        try {
+        if (messagePromise == null) {
+            messagePromise = eventExecutor.newPromise();
+            return messagePromise;
+        }
+        return messagePromise;
+        } finally {
+            promiseLock.unlock
+        }
+    }
+
+    def boolean isNotification(NetconfMessage message) {
+        val xmle = XmlElement.fromDomDocument(message.getDocument());
+        return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName());
+    }
+}
+
+package class NetconfDeviceSchemaContextProvider {
+
+    @Property
+    val NetconfDevice device;
+
+    @Property
+    val SchemaSourceProvider<InputStream> sourceProvider;
+
+    @Property
+    var Optional<SchemaContext> currentContext;
+
+    new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
+        _device = device
+        _sourceProvider = sourceProvider
+    }
+
+    def createContextFromCapabilities(Iterable<QName> capabilities) {
+
+        val modelsToParse = ImmutableMap.<QName, InputStream>builder();
+        for (cap : capabilities) {
+            val source = sourceProvider.getSchemaSource(cap.localName, Optional.fromNullable(cap.formattedRevision));
+            if (source.present) {
+                modelsToParse.put(cap, source.get());
+            }
+        }
+        val context = tryToCreateContext(modelsToParse.build);
+        currentContext = Optional.fromNullable(context);
+    }
+
+    def SchemaContext tryToCreateContext(Map<QName, InputStream> modelsToParse) {
+        val parser = new YangParserImpl();
+        try {
+            val models = parser.parseYangModelsFromStreams(ImmutableList.copyOf(modelsToParse.values));
+            val result = parser.resolveSchemaContext(models);
+            return result;
+        } catch (Exception e) {
+            device.logger.debug("Error occured during parsing YANG schemas", e);
+            return null;
+        }
+    }
+}
+
+package class NetconfDeviceSchemaSourceProvider implements SchemaSourceProvider<String> {
+
+    val NetconfDevice device;
+
+    new(NetconfDevice device) {
+        this.device = device;
+    }
+
+    override getSchemaSource(String moduleName, Optional<String> revision) {
+        val it = ImmutableCompositeNode.builder() //
+        setQName(QName::create(NetconfState.QNAME, "get-schema")) //
+        addLeaf("format", "yang")
+        addLeaf("identifier", moduleName)
+        if (revision.present) {
+            addLeaf("version", revision.get())
+        }
+
+        device.logger.info("Loading YANG schema source for {}:{}", moduleName, revision)
+        val schemaReply = device.invokeRpc(getQName(), toInstance());
+
+        if (schemaReply.successful) {
+            val schemaBody = schemaReply.result.getFirstSimpleByName(
+                QName::create(NetconfState.QNAME.namespace, null, "data"))?.value;
+            device.logger.info("YANG Schema successfully received for: {}:{}", moduleName, revision);
+            return Optional.of(schemaBody as String);
+        }
+        return Optional.absent();
+    }
+}