Merge "Bug 509: Fixed incorrect merging of Data Store Writes / Events"
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.xtend
index bfe352ad41322cf78404426a5582afc22a4e074d..12e8b5587caa230cb693610d21f8135f61e9e2e9 100644 (file)
@@ -1,70 +1,65 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.sal.connect.netconf
 
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
-import org.opendaylight.controller.md.sal.common.api.data.DataReader
-import org.opendaylight.yangtools.yang.data.api.CompositeNode
-import org.opendaylight.controller.netconf.client.NetconfClient
-import org.opendaylight.controller.sal.core.api.RpcImplementation
-import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
+import com.google.common.base.Optional
+import com.google.common.collect.FluentIterable
+import io.netty.util.concurrent.EventExecutor
+import java.io.InputStream
 import java.net.InetSocketAddress
-import org.opendaylight.yangtools.yang.data.api.Node
-import org.opendaylight.yangtools.yang.data.api.SimpleNode
-import org.opendaylight.yangtools.yang.common.QName
+import java.net.URI
+import java.util.ArrayList
+import java.util.Collection
 import java.util.Collections
+import java.util.List
+import java.util.Set
+import java.util.concurrent.ExecutorService
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
+import org.opendaylight.controller.md.sal.common.api.data.DataModification
+import org.opendaylight.controller.md.sal.common.api.data.DataReader
 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
-import org.opendaylight.yangtools.concepts.Registration
-import org.opendaylight.controller.sal.core.api.Provider
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
-import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
-import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
+import org.opendaylight.controller.sal.core.api.Provider
+import org.opendaylight.controller.sal.core.api.RpcImplementation
 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
 import org.opendaylight.protocol.framework.ReconnectStrategy
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
-import org.opendaylight.controller.md.sal.common.api.data.DataModification
-import com.google.common.collect.FluentIterable
-import org.opendaylight.yangtools.yang.model.api.SchemaContext
+import org.opendaylight.yangtools.concepts.Registration
+import org.opendaylight.yangtools.yang.common.QName
+import org.opendaylight.yangtools.yang.data.api.CompositeNode
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
+import org.opendaylight.yangtools.yang.data.api.Node
+import org.opendaylight.yangtools.yang.data.api.SimpleNode
+import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfState
+import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
+import org.opendaylight.yangtools.yang.model.api.SchemaContext
+import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
-import java.io.InputStream
-import org.slf4j.LoggerFactory
+import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
 import org.slf4j.Logger
-import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener
-import org.opendaylight.controller.netconf.client.NetconfClientSession
-import org.opendaylight.controller.netconf.api.NetconfMessage
-import io.netty.util.concurrent.EventExecutor
+import org.slf4j.LoggerFactory
 
-import java.util.Map
-import java.util.Set
-import com.google.common.collect.ImmutableMap
+import static com.google.common.base.Preconditions.*
+import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
 
-import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
-import com.google.common.base.Optional
-import com.google.common.collect.ImmutableList
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
-import static com.google.common.base.Preconditions.*;
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Future
-import org.opendaylight.controller.netconf.client.NetconfClientSessionListener
-import io.netty.util.concurrent.Promise
-import org.opendaylight.controller.netconf.util.xml.XmlElement
-import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants
-import java.util.concurrent.ExecutionException
-import java.util.concurrent.locks.ReentrantLock
-
-class NetconfDevice implements Provider, // 
+import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
+
+class NetconfDevice implements Provider, //
 DataReader<InstanceIdentifier, CompositeNode>, //
 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
 RpcImplementation, //
 AutoCloseable {
 
-    var NetconfClient client;
-
     @Property
     var InetSocketAddress socketAddress;
 
@@ -86,30 +81,35 @@ AutoCloseable {
     @Property
     var AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
 
-    private NetconfDeviceSchemaContextProvider schemaContextProvider
+    @Property
+    private NetconfDeviceSchemaContextProvider deviceContextProvider
 
     protected val Logger logger
 
     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
+    List<RpcRegistration> rpcReg
 
+    @Property
     val String name
-    MountProvisionService mountService
 
-    int messegeRetryCount = 5;
-
-    int messageTimeoutCount = 5 * 1000;
-
-    Set<QName> cachedCapabilities
+    MountProvisionService mountService
 
     @Property
     var NetconfClientDispatcher dispatcher
-    
+
     static val InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
 
+    @Property
+    var SchemaSourceProvider<InputStream> remoteSourceProvider
+
+    DataBrokerService dataBroker
+
+    var NetconfDeviceListener listener;
+
     public new(String name) {
-        this.name = name;
+        this._name = name;
         this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
         this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
             Collections.singletonMap(INVENTORY_ID, name)).toInstance;
@@ -120,54 +120,98 @@ AutoCloseable {
         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
         checkState(eventExecutor != null, "Event executor must be set.");
 
-        val listener = new NetconfDeviceListener(this,eventExecutor);
-        val task = startClientTask(dispatcher, listener)
-        if(mountInstance != null) {
-            confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
-            operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
-        }
-        return processingExecutor.submit(task) as Future<Void>;
+        listener = new NetconfDeviceListener(this);
+
+        logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
 
-    //commitHandlerReg = mountInstance.registerCommitHandler(path,this);
+        dispatcher.createClient(socketAddress, listener, reconnectStrategy);
     }
 
     def Optional<SchemaContext> getSchemaContext() {
-        if (schemaContextProvider == null) {
+        if (deviceContextProvider == null) {
             return Optional.absent();
         }
-        return schemaContextProvider.currentContext;
+        return deviceContextProvider.currentContext;
     }
 
-    private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
-        return [ |
-            logger.info("Starting Netconf Client on: {}", socketAddress);
-            client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
-            logger.debug("Initial capabilities {}", initialCapabilities);
-            var SchemaSourceProvider<String> delegate;
-            if (initialCapabilities.contains(NetconfMapping.IETF_NETCONF_MONITORING_MODULE)) {
-                delegate = new NetconfDeviceSchemaSourceProvider(this);
-            } else {
-                logger.info("Device does not support IETF Netconf Monitoring.", socketAddress);
-                delegate = SchemaSourceProviders.<String>noopProvider();
+    def bringDown() {
+        if (rpcReg != null) {
+            for (reg : rpcReg) {
+                reg.close()
             }
-            val sourceProvider = schemaSourceProvider.createInstanceFor(delegate);
-            schemaContextProvider = new NetconfDeviceSchemaContextProvider(this, sourceProvider);
-            schemaContextProvider.createContextFromCapabilities(initialCapabilities);
-            if (mountInstance != null && schemaContext.isPresent) {
-                mountInstance.schemaContext = schemaContext.get();
+            rpcReg = null
+        }
+        confReaderReg?.close()
+        confReaderReg = null
+        operReaderReg?.close()
+        operReaderReg = null
+        commitHandlerReg?.close()
+        commitHandlerReg = null
+
+        updateDeviceState(false, Collections.emptySet())
+    }
+
+    def bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
+        remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
+        deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
+        deviceContextProvider.createContextFromCapabilities(capabilities);
+        if (mountInstance != null && schemaContext.isPresent) {
+            mountInstance.schemaContext = schemaContext.get();
+        }
+
+        updateDeviceState(true, capabilities)
+
+        if (mountInstance != null) {
+            confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
+            operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
+            commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
+
+            val rpcs = new ArrayList<RpcRegistration>();
+            for (rpc : mountInstance.schemaContext.operations) {
+                rpcs.add(mountInstance.addRpcImplementation(rpc.QName, this));
             }
-        ]
+            rpcReg = rpcs
+        }
+    }
+
+    private def updateDeviceState(boolean up, Set<QName> capabilities) {
+        val transaction = dataBroker.beginTransaction
+
+        val it = ImmutableCompositeNode.builder
+        setQName(INVENTORY_NODE)
+        addLeaf(INVENTORY_ID, name)
+        addLeaf(INVENTORY_CONNECTED, up)
+
+        logger.debug("Client capabilities {}", capabilities)
+        for (capability : capabilities) {
+            addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
+        }
+
+        logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
+        transaction.removeOperationalData(path)
+        transaction.putOperationalData(path, it.toInstance)
+        logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
+
+        // FIXME: this has to be asynchronous
+        val transactionStatus = transaction.commit.get;
+
+        if (transactionStatus.successful) {
+            logger.debug("Update device state transaction " + transaction.identifier + " SUCCESSFUL.")
+        } else {
+            logger.debug("Update device state transaction " + transaction.identifier + " FAILED!")
+            logger.debug("Update device state transaction status " + transaction.status)
+        }
     }
 
     override readConfigurationData(InstanceIdentifier path) {
         val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
-            wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure()));
+            wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get();
         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
         return data?.findNode(path) as CompositeNode;
     }
 
     override readOperationalData(InstanceIdentifier path) {
-        val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure()));
+        val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get();
         val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
         return data?.findNode(path) as CompositeNode;
     }
@@ -175,18 +219,9 @@ AutoCloseable {
     override getSupportedRpcs() {
         Collections.emptySet;
     }
-    
-    def createSubscription(String streamName) {
-        val it = ImmutableCompositeNode.builder()
-        QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
-        addLeaf("stream",streamName);
-        invokeRpc(QName,toInstance())
-    }
 
     override invokeRpc(QName rpc, CompositeNode input) {
-        val message = rpc.toRpcMessage(input);
-        val result = client.sendMessage(message, messegeRetryCount, messageTimeoutCount);
-        return result.toRpcResult();
+        return listener.sendRequest(rpc.toRpcMessage(input,schemaContext));
     }
 
     override getProviderFunctionality() {
@@ -194,7 +229,7 @@ AutoCloseable {
     }
 
     override onSessionInitiated(ProviderSession session) {
-        val dataBroker = session.getService(DataBrokerService);
+        dataBroker = session.getService(DataBrokerService);
 
         val transaction = dataBroker.beginTransaction
         if (transaction.operationalNodeNotExisting) {
@@ -221,7 +256,7 @@ AutoCloseable {
         return null === transaction.readOperationalData(path);
     }
 
-    def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
+    static def Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
 
         var Node<?> current = node;
         for (arg : identifier.path) {
@@ -230,11 +265,16 @@ AutoCloseable {
             } else if (current instanceof CompositeNode) {
                 val currentComposite = (current as CompositeNode);
 
-                current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
-                if (current == null) {
-                    current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
+                current = currentComposite.getFirstCompositeByName(arg.nodeType);
+                if(current == null) {
+                    current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
+                }
+                if(current == null) {
+                    current = currentComposite.getFirstSimpleByName(arg.nodeType);
                 }
                 if (current == null) {
+                    current = currentComposite.getFirstSimpleByName(arg.nodeType.withoutRevision());
+                } if (current == null) {
                     return null;
                 }
             }
@@ -243,123 +283,35 @@ AutoCloseable {
     }
 
     override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
-        throw new UnsupportedOperationException("TODO: auto-generated method stub")
+        val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification, true);
+        twoPhaseCommit.prepare()
+        return twoPhaseCommit;
     }
 
-    def getInitialCapabilities() {
-        val capabilities = client?.capabilities;
-        if (capabilities == null) {
-            return null;
-        }
-        if (cachedCapabilities == null) {
-            cachedCapabilities = FluentIterable.from(capabilities).filter[
+    def getCapabilities(Collection<String> capabilities) {
+        return FluentIterable.from(capabilities).filter[
                 contains("?") && contains("module=") && contains("revision=")].transform [
                 val parts = split("\\?");
                 val namespace = parts.get(0);
                 val queryParams = FluentIterable.from(parts.get(1).split("&"));
-                val revision = queryParams.findFirst[startsWith("revision=")].replaceAll("revision=", "");
-                val moduleName = queryParams.findFirst[startsWith("module=")].replaceAll("module=", "");
+                var revision = queryParams.findFirst[startsWith("revision=")]?.replaceAll("revision=", "");
+                val moduleName = queryParams.findFirst[startsWith("module=")]?.replaceAll("module=", "");
+                if (revision === null) {
+                    logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
+                    revision = queryParams.findFirst[startsWith("&amp;revision=")]?.replaceAll("revision=", "");
+                    if (revision != null) {
+                        logger.warn("Netconf device returned revision incorectly escaped for {}", it)
+                    }
+                }
+                if (revision == null) {
+                    return QName.create(URI.create(namespace), null, moduleName);
+                }
                 return QName.create(namespace, revision, moduleName);
             ].toSet();
-        }
-        return cachedCapabilities;
     }
 
     override close() {
-        confReaderReg?.close()
-        operReaderReg?.close()
-        client?.close()
-    }
-
-}
-
-package class NetconfDeviceListener extends NetconfClientSessionListener {
-
-    val NetconfDevice device
-    val EventExecutor eventExecutor
-
-    new(NetconfDevice device,EventExecutor eventExecutor) {
-        this.device = device
-        this.eventExecutor = eventExecutor
-    }
-
-    var Promise<NetconfMessage> messagePromise;
-    val promiseLock = new ReentrantLock;
-    
-    override onMessage(NetconfClientSession session, NetconfMessage message) {
-        if (isNotification(message)) {
-            onNotification(session, message);
-        } else try {
-            promiseLock.lock
-            if (messagePromise != null) {
-                messagePromise.setSuccess(message);
-                messagePromise = null;
-            }
-        } finally {
-            promiseLock.unlock
-        }
-    }
-
-    /**
-     * Method intended to customize notification processing.
-     * 
-     * @param session
-     *            {@see
-     *            NetconfClientSessionListener#onMessage(NetconfClientSession,
-     *            NetconfMessage)}
-     * @param message
-     *            {@see
-     *            NetconfClientSessionListener#onMessage(NetconfClientSession,
-     *            NetconfMessage)}
-     */
-    def void onNotification(NetconfClientSession session, NetconfMessage message) {
-        device.logger.debug("Received NETCONF notification.",message);
-        val domNotification = message?.toCompositeNode?.notificationBody;
-        if(domNotification != null) {
-            device?.mountInstance?.publish(domNotification);
-        }
-    }
-    
-    private static def CompositeNode getNotificationBody(CompositeNode node) {
-        for(child : node.children) {
-            if(child instanceof CompositeNode) {
-                return child as CompositeNode;
-            }
-        }
-    }
-
-    override getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException {
-        val promise = promiseReply();
-        val messageAvailable = promise.await(attempts + attemptMsDelay);
-        if (messageAvailable) {
-            try {
-                return promise.get();
-            } catch (ExecutionException e) {
-                throw new IllegalStateException(e);
-            }
-        }
-
-        throw new IllegalStateException("Unsuccessful after " + attempts + " attempts.");
-
-    // throw new TimeoutException("Message was not received on time.");
-    }
-
-    def Promise<NetconfMessage> promiseReply() {
-        promiseLock.lock
-        try {
-        if (messagePromise == null) {
-            messagePromise = eventExecutor.newPromise();
-            return messagePromise;
-        }
-        return messagePromise;
-        } finally {
-            promiseLock.unlock
-        }
-    }
-
-    def boolean isNotification(NetconfMessage message) {
-        val xmle = XmlElement.fromDomDocument(message.getDocument());
-        return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName());
+        bringDown()
     }
 }
 
@@ -377,25 +329,33 @@ package class NetconfDeviceSchemaContextProvider {
     new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
         _device = device
         _sourceProvider = sourceProvider
+        _currentContext = Optional.absent();
     }
 
     def createContextFromCapabilities(Iterable<QName> capabilities) {
-
-        val modelsToParse = ImmutableMap.<QName, InputStream>builder();
-        for (cap : capabilities) {
-            val source = sourceProvider.getSchemaSource(cap.localName, Optional.fromNullable(cap.formattedRevision));
-            if (source.present) {
-                modelsToParse.put(cap, source.get());
-            }
+        val sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider)
+        if (!sourceContext.missingSources.empty) {
+            device.logger.warn("Sources for following models are missing {}", sourceContext.missingSources);
+        }
+        device.logger.debug("Trying to create schema context from {}", sourceContext.validSources)
+        val modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
+        if (!sourceContext.validSources.empty) {
+            val schemaContext = tryToCreateContext(modelsToParse);
+            currentContext = Optional.fromNullable(schemaContext);
+        } else {
+            currentContext = Optional.absent();
         }
-        val context = tryToCreateContext(modelsToParse.build);
-        currentContext = Optional.fromNullable(context);
+        if (currentContext.present) {
+            device.logger.debug("Schema context successfully created.");
+        }
+
     }
 
-    def SchemaContext tryToCreateContext(Map<QName, InputStream> modelsToParse) {
+    def SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
         val parser = new YangParserImpl();
         try {
-            val models = parser.parseYangModelsFromStreams(ImmutableList.copyOf(modelsToParse.values));
+
+            val models = parser.parseYangModelsFromStreams(modelsToParse);
             val result = parser.resolveSchemaContext(models);
             return result;
         } catch (Exception e) {
@@ -404,33 +364,3 @@ package class NetconfDeviceSchemaContextProvider {
         }
     }
 }
-
-package class NetconfDeviceSchemaSourceProvider implements SchemaSourceProvider<String> {
-
-    val NetconfDevice device;
-
-    new(NetconfDevice device) {
-        this.device = device;
-    }
-
-    override getSchemaSource(String moduleName, Optional<String> revision) {
-        val it = ImmutableCompositeNode.builder() //
-        setQName(QName::create(NetconfState.QNAME, "get-schema")) //
-        addLeaf("format", "yang")
-        addLeaf("identifier", moduleName)
-        if (revision.present) {
-            addLeaf("version", revision.get())
-        }
-
-        device.logger.info("Loading YANG schema source for {}:{}", moduleName, revision)
-        val schemaReply = device.invokeRpc(getQName(), toInstance());
-
-        if (schemaReply.successful) {
-            val schemaBody = schemaReply.result.getFirstSimpleByName(
-                QName::create(NetconfState.QNAME.namespace, null, "data"))?.value;
-            device.logger.info("YANG Schema successfully received for: {}:{}", moduleName, revision);
-            return Optional.of(schemaBody as String);
-        }
-        return Optional.absent();
-    }
-}