Updated implementation of Netconf, fixed DOM Mountpoint
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.xtend
index bfe352ad41322cf78404426a5582afc22a4e074d..da0790c599763318412dbe630a1508d30064326e 100644 (file)
@@ -1,61 +1,55 @@
 package org.opendaylight.controller.sal.connect.netconf
 
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
-import org.opendaylight.controller.md.sal.common.api.data.DataReader
-import org.opendaylight.yangtools.yang.data.api.CompositeNode
-import org.opendaylight.controller.netconf.client.NetconfClient
-import org.opendaylight.controller.sal.core.api.RpcImplementation
-import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
+import com.google.common.base.Optional
+import com.google.common.collect.FluentIterable
+import io.netty.util.concurrent.EventExecutor
+import java.io.InputStream
 import java.net.InetSocketAddress
-import org.opendaylight.yangtools.yang.data.api.Node
-import org.opendaylight.yangtools.yang.data.api.SimpleNode
-import org.opendaylight.yangtools.yang.common.QName
+import java.net.URI
 import java.util.Collections
+import java.util.List
+import java.util.Set
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Future
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
+import org.opendaylight.controller.md.sal.common.api.data.DataModification
+import org.opendaylight.controller.md.sal.common.api.data.DataReader
+import org.opendaylight.controller.netconf.api.NetconfMessage
+import org.opendaylight.controller.netconf.client.NetconfClient
 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
-import org.opendaylight.yangtools.concepts.Registration
-import org.opendaylight.controller.sal.core.api.Provider
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*;
+import org.opendaylight.controller.sal.core.api.Provider
+import org.opendaylight.controller.sal.core.api.RpcImplementation
 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
 import org.opendaylight.protocol.framework.ReconnectStrategy
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
-import org.opendaylight.controller.md.sal.common.api.data.DataModification
-import com.google.common.collect.FluentIterable
-import org.opendaylight.yangtools.yang.model.api.SchemaContext
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState
+import org.opendaylight.yangtools.concepts.Registration
+import org.opendaylight.yangtools.yang.common.QName
+import org.opendaylight.yangtools.yang.data.api.CompositeNode
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
+import org.opendaylight.yangtools.yang.data.api.Node
+import org.opendaylight.yangtools.yang.data.api.SimpleNode
+import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
+import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
+import org.opendaylight.yangtools.yang.model.api.SchemaContext
+import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
+import org.opendaylight.yangtools.yang.model.util.repo.SourceIdentifier
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
-import java.io.InputStream
-import org.slf4j.LoggerFactory
+import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
 import org.slf4j.Logger
-import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener
-import org.opendaylight.controller.netconf.client.NetconfClientSession
-import org.opendaylight.controller.netconf.api.NetconfMessage
-import io.netty.util.concurrent.EventExecutor
+import org.slf4j.LoggerFactory
 
-import java.util.Map
-import java.util.Set
-import com.google.common.collect.ImmutableMap
+import static com.google.common.base.Preconditions.*
+import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
 
-import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
-import com.google.common.base.Optional
-import com.google.common.collect.ImmutableList
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
-import static com.google.common.base.Preconditions.*;
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Future
-import org.opendaylight.controller.netconf.client.NetconfClientSessionListener
-import io.netty.util.concurrent.Promise
-import org.opendaylight.controller.netconf.util.xml.XmlElement
-import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants
-import java.util.concurrent.ExecutionException
-import java.util.concurrent.locks.ReentrantLock
+import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
+import org.opendaylight.controller.netconf.util.xml.XmlUtil
 
 class NetconfDevice implements Provider, // 
 DataReader<InstanceIdentifier, CompositeNode>, //
@@ -86,7 +80,8 @@ AutoCloseable {
     @Property
     var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
 
-    private NetconfDeviceSchemaContextProvider schemaContextProvider
+    @Property
+    private NetconfDeviceSchemaContextProvider deviceContextProvider
 
     protected val Logger logger
 
@@ -105,9 +100,12 @@ AutoCloseable {
 
     @Property
     var NetconfClientDispatcher dispatcher
-    
+
     static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
 
+    @Property
+    var SchemaSourceProvider<InputStream> remoteSourceProvider
+
     public new(String name) {
         this.name = name;
         this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
@@ -120,11 +118,12 @@ AutoCloseable {
         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
         checkState(eventExecutor != null, "Event executor must be set.");
 
-        val listener = new NetconfDeviceListener(this,eventExecutor);
+        val listener = new NetconfDeviceListener(this, eventExecutor);
         val task = startClientTask(dispatcher, listener)
-        if(mountInstance != null) {
+        if (mountInstance != null) {
             confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
             operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
+            commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this)
         }
         return processingExecutor.submit(task) as Future<Void>;
 
@@ -132,27 +131,28 @@ AutoCloseable {
     }
 
     def Optional<SchemaContext> getSchemaContext() {
-        if (schemaContextProvider == null) {
+        if (deviceContextProvider == null) {
             return Optional.absent();
         }
-        return schemaContextProvider.currentContext;
+        return deviceContextProvider.currentContext;
     }
 
     private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
+
         return [ |
             logger.info("Starting Netconf Client on: {}", socketAddress);
             client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
             logger.debug("Initial capabilities {}", initialCapabilities);
             var SchemaSourceProvider<String> delegate;
-            if (initialCapabilities.contains(NetconfMapping.IETF_NETCONF_MONITORING_MODULE)) {
-                delegate = new NetconfDeviceSchemaSourceProvider(this);
-            } else {
-                logger.info("Device does not support IETF Netconf Monitoring.", socketAddress);
+            if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) {
+                delegate = new NetconfRemoteSchemaSourceProvider(this);
+            }  else {
+                logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress);
                 delegate = SchemaSourceProviders.<String>noopProvider();
             }
-            val sourceProvider = schemaSourceProvider.createInstanceFor(delegate);
-            schemaContextProvider = new NetconfDeviceSchemaContextProvider(this, sourceProvider);
-            schemaContextProvider.createContextFromCapabilities(initialCapabilities);
+            remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
+            deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
+            deviceContextProvider.createContextFromCapabilities(initialCapabilities);
             if (mountInstance != null && schemaContext.isPresent) {
                 mountInstance.schemaContext = schemaContext.get();
             }
@@ -175,18 +175,31 @@ AutoCloseable {
     override getSupportedRpcs() {
         Collections.emptySet;
     }
-    
+
     def createSubscription(String streamName) {
         val it = ImmutableCompositeNode.builder()
         QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
-        addLeaf("stream",streamName);
-        invokeRpc(QName,toInstance())
+        addLeaf("stream", streamName);
+        invokeRpc(QName, toInstance())
     }
 
     override invokeRpc(QName rpc, CompositeNode input) {
-        val message = rpc.toRpcMessage(input);
-        val result = client.sendMessage(message, messegeRetryCount, messageTimeoutCount);
-        return result.toRpcResult();
+        try {
+            val message = rpc.toRpcMessage(input,schemaContext);
+            val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
+            return result.toRpcResult(rpc, schemaContext);
+
+        } catch (Exception e) {
+            logger.error("Rpc was not processed correctly.", e)
+            throw e;
+        }
+    }
+
+    def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) {
+        logger.debug("Send message {}",XmlUtil.toString(message.document))
+        val result = client.sendMessage(message, retryCount, timeout);
+        NetconfMapping.checkValidReply(message, result)
+        return result;
     }
 
     override getProviderFunctionality() {
@@ -221,7 +234,7 @@ AutoCloseable {
         return null === transaction.readOperationalData(path);
     }
 
-    def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
+    static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
 
         var Node<?> current = node;
         for (arg : identifier.path) {
@@ -229,12 +242,17 @@ AutoCloseable {
                 return null;
             } else if (current instanceof CompositeNode) {
                 val currentComposite = (current as CompositeNode);
-
-                current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
-                if (current == null) {
-                    current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
+                
+                current = currentComposite.getFirstCompositeByName(arg.nodeType);
+                if(current == null) {
+                    current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
+                }
+                if(current == null) {
+                    current = currentComposite.getFirstSimpleByName(arg.nodeType);
                 }
                 if (current == null) {
+                    current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
+                } if (current == null) {
                     return null;
                 }
             }
@@ -243,7 +261,9 @@ AutoCloseable {
     }
 
     override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
-        throw new UnsupportedOperationException("TODO: auto-generated method stub")
+        val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification);
+        twoPhaseCommit.prepare()
+        return twoPhaseCommit;
     }
 
     def getInitialCapabilities() {
@@ -257,8 +277,18 @@ AutoCloseable {
                 val parts = split("\\?");
                 val namespace = parts.get(0);
                 val queryParams = FluentIterable.from(parts.get(1).split("&"));
-                val revision = queryParams.findFirst[startsWith("revision=")].replaceAll("revision=", "");
-                val moduleName = queryParams.findFirst[startsWith("module=")].replaceAll("module=", "");
+                var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
+                val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
+                if (revision === null) {
+                    logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
+                    revision = queryParams.findFirst[startsWith("&amp;revision=")]?.replaceAll("revision=", "");
+                    if (revision != null) {
+                        logger.warn("Netconf device returned revision incorectly escaped for {}", it)
+                    }
+                }
+                if (revision == null) {
+                    return QName.create(URI.create(namespace), null, moduleName);
+                }
                 return QName.create(namespace, revision, moduleName);
             ].toSet();
         }
@@ -273,96 +303,6 @@ AutoCloseable {
 
 }
 
-package class NetconfDeviceListener extends NetconfClientSessionListener {
-
-    val NetconfDevice device
-    val EventExecutor eventExecutor
-
-    new(NetconfDevice device,EventExecutor eventExecutor) {
-        this.device = device
-        this.eventExecutor = eventExecutor
-    }
-
-    var Promise<NetconfMessage> messagePromise;
-    val promiseLock = new ReentrantLock;
-    
-    override onMessage(NetconfClientSession session, NetconfMessage message) {
-        if (isNotification(message)) {
-            onNotification(session, message);
-        } else try {
-            promiseLock.lock
-            if (messagePromise != null) {
-                messagePromise.setSuccess(message);
-                messagePromise = null;
-            }
-        } finally {
-            promiseLock.unlock
-        }
-    }
-
-    /**
-     * Method intended to customize notification processing.
-     * 
-     * @param session
-     *            {@see
-     *            NetconfClientSessionListener#onMessage(NetconfClientSession,
-     *            NetconfMessage)}
-     * @param message
-     *            {@see
-     *            NetconfClientSessionListener#onMessage(NetconfClientSession,
-     *            NetconfMessage)}
-     */
-    def void onNotification(NetconfClientSession session, NetconfMessage message) {
-        device.logger.debug("Received NETCONF notification.",message);
-        val domNotification = message?.toCompositeNode?.notificationBody;
-        if(domNotification != null) {
-            device?.mountInstance?.publish(domNotification);
-        }
-    }
-    
-    private static def CompositeNode getNotificationBody(CompositeNode node) {
-        for(child : node.children) {
-            if(child instanceof CompositeNode) {
-                return child as CompositeNode;
-            }
-        }
-    }
-
-    override getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException {
-        val promise = promiseReply();
-        val messageAvailable = promise.await(attempts + attemptMsDelay);
-        if (messageAvailable) {
-            try {
-                return promise.get();
-            } catch (ExecutionException e) {
-                throw new IllegalStateException(e);
-            }
-        }
-
-        throw new IllegalStateException("Unsuccessful after " + attempts + " attempts.");
-
-    // throw new TimeoutException("Message was not received on time.");
-    }
-
-    def Promise<NetconfMessage> promiseReply() {
-        promiseLock.lock
-        try {
-        if (messagePromise == null) {
-            messagePromise = eventExecutor.newPromise();
-            return messagePromise;
-        }
-        return messagePromise;
-        } finally {
-            promiseLock.unlock
-        }
-    }
-
-    def boolean isNotification(NetconfMessage message) {
-        val xmle = XmlElement.fromDomDocument(message.getDocument());
-        return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName());
-    }
-}
-
 package class NetconfDeviceSchemaContextProvider {
 
     @Property
@@ -380,22 +320,29 @@ package class NetconfDeviceSchemaContextProvider {
     }
 
     def createContextFromCapabilities(Iterable<QName> capabilities) {
-
-        val modelsToParse = ImmutableMap.<QName, InputStream>builder();
-        for (cap : capabilities) {
-            val source = sourceProvider.getSchemaSource(cap.localName, Optional.fromNullable(cap.formattedRevision));
-            if (source.present) {
-                modelsToParse.put(cap, source.get());
-            }
+        val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
+        if (!sourceContext.missingSources.empty) {
+            device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
+        }
+        device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
+        val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
+        if (!sourceContext.validSources.empty) {
+            val schemaContext = tryToCreateContext(modelsToParse);
+            currentContext = Optional.fromNullable(schemaContext);
+        } else {
+            currentContext = Optional.absent();
         }
-        val context = tryToCreateContext(modelsToParse.build);
-        currentContext = Optional.fromNullable(context);
+        if (currentContext.present) {
+            device.logger.debug("Schema context successfully created.");
+        }
+
     }
 
-    def SchemaContext tryToCreateContext(Map<QName, InputStream> modelsToParse) {
+    def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
         val parser = new YangParserImpl();
         try {
-            val models = parser.parseYangModelsFromStreams(ImmutableList.copyOf(modelsToParse.values));
+
+            val models = parser.parseYangModelsFromStreams(modelsToParse);
             val result = parser.resolveSchemaContext(models);
             return result;
         } catch (Exception e) {
@@ -404,33 +351,3 @@ package class NetconfDeviceSchemaContextProvider {
         }
     }
 }
-
-package class NetconfDeviceSchemaSourceProvider implements SchemaSourceProvider<String> {
-
-    val NetconfDevice device;
-
-    new(NetconfDevice device) {
-        this.device = device;
-    }
-
-    override getSchemaSource(String moduleName, Optional<String> revision) {
-        val it = ImmutableCompositeNode.builder() //
-        setQName(QName::create(NetconfState.QNAME, "get-schema")) //
-        addLeaf("format", "yang")
-        addLeaf("identifier", moduleName)
-        if (revision.present) {
-            addLeaf("version", revision.get())
-        }
-
-        device.logger.info("Loading YANG schema source for {}:{}", moduleName, revision)
-        val schemaReply = device.invokeRpc(getQName(), toInstance());
-
-        if (schemaReply.successful) {
-            val schemaBody = schemaReply.result.getFirstSimpleByName(
-                QName::create(NetconfState.QNAME.namespace, null, "data"))?.value;
-            device.logger.info("YANG Schema successfully received for: {}:{}", moduleName, revision);
-            return Optional.of(schemaBody as String);
-        }
-        return Optional.absent();
-    }
-}