BUG-372: Rework sal-netconf-connector
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.xtend
index 3eb0472b5c5bf3a4b1079e731875529f072da7d0..12e8b5587caa230cb693610d21f8135f61e9e2e9 100644 (file)
@@ -13,19 +13,18 @@ import io.netty.util.concurrent.EventExecutor
 import java.io.InputStream
 import java.net.InetSocketAddress
 import java.net.URI
+import java.util.ArrayList
+import java.util.Collection
 import java.util.Collections
 import java.util.List
 import java.util.Set
 import java.util.concurrent.ExecutorService
-import java.util.concurrent.Future
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
 import org.opendaylight.controller.md.sal.common.api.data.DataModification
 import org.opendaylight.controller.md.sal.common.api.data.DataReader
-import org.opendaylight.controller.netconf.api.NetconfMessage
-import org.opendaylight.controller.netconf.client.NetconfClient
 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
-import org.opendaylight.controller.netconf.util.xml.XmlUtil
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
 import org.opendaylight.controller.sal.core.api.Provider
 import org.opendaylight.controller.sal.core.api.RpcImplementation
 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
@@ -45,7 +44,6 @@ import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
 import org.opendaylight.yangtools.yang.model.api.SchemaContext
 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
 import org.slf4j.Logger
@@ -55,16 +53,13 @@ import static com.google.common.base.Preconditions.*
 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.*
 
 import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
-import com.google.common.util.concurrent.Futures
 
-class NetconfDevice implements Provider, // 
+class NetconfDevice implements Provider, //
 DataReader<InstanceIdentifier, CompositeNode>, //
 DataCommitHandler<InstanceIdentifier, CompositeNode>, //
 RpcImplementation, //
 AutoCloseable {
 
-    var NetconfClient client;
-
     @Property
     var InetSocketAddress socketAddress;
 
@@ -94,15 +89,12 @@ AutoCloseable {
     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
+    List<RpcRegistration> rpcReg
 
+    @Property
     val String name
-    MountProvisionService mountService
-
-    int messegeRetryCount = 5;
-
-    int messageTimeoutCount = 5 * 1000;
 
-    Set<QName> cachedCapabilities
+    MountProvisionService mountService
 
     @Property
     var NetconfClientDispatcher dispatcher
@@ -111,11 +103,13 @@ AutoCloseable {
 
     @Property
     var SchemaSourceProvider<InputStream> remoteSourceProvider
-    
+
     DataBrokerService dataBroker
 
+    var NetconfDeviceListener listener;
+
     public new(String name) {
-        this.name = name;
+        this._name = name;
         this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
         this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
             Collections.singletonMap(INVENTORY_ID, name)).toInstance;
@@ -126,10 +120,11 @@ AutoCloseable {
         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
         checkState(eventExecutor != null, "Event executor must be set.");
 
-        val listener = new NetconfDeviceListener(this);
-        val task = startClientTask(dispatcher, listener)
-        return processingExecutor.submit(task) as Future<Void>;
+        listener = new NetconfDeviceListener(this);
 
+        logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
+
+        dispatcher.createClient(socketAddress, listener, reconnectStrategy);
     }
 
     def Optional<SchemaContext> getSchemaContext() {
@@ -139,59 +134,65 @@ AutoCloseable {
         return deviceContextProvider.currentContext;
     }
 
-    private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
-        return [ |
-            try {
-                logger.info("Starting Netconf Client on: {}", socketAddress);
-                client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
-                logger.debug("Initial capabilities {}", initialCapabilities);
-                var SchemaSourceProvider<String> delegate;
-                if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) {
-                    delegate = new NetconfRemoteSchemaSourceProvider(this);
-                }  else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) {
-                    delegate = new NetconfRemoteSchemaSourceProvider(this);
-                } else {
-                    logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress);
-                    delegate = SchemaSourceProviders.<String>noopProvider();
-                }
-                remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
-                deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
-                deviceContextProvider.createContextFromCapabilities(initialCapabilities);
-                if (mountInstance != null && schemaContext.isPresent) {
-                    mountInstance.schemaContext = schemaContext.get();
-                    val operations = schemaContext.get().operations;
-                    for (rpc : operations) {
-                        mountInstance.addRpcImplementation(rpc.QName, this);
-                    }
-                }
-                updateDeviceState()
-                if (mountInstance != null && confReaderReg == null && operReaderReg == null) {
-                    confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
-                    operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
-                    commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
-                }
-            } catch (Exception e) {
-                logger.error("Netconf client NOT started. ", e)
+    def bringDown() {
+        if (rpcReg != null) {
+            for (reg : rpcReg) {
+                reg.close()
+            }
+            rpcReg = null
+        }
+        confReaderReg?.close()
+        confReaderReg = null
+        operReaderReg?.close()
+        operReaderReg = null
+        commitHandlerReg?.close()
+        commitHandlerReg = null
+
+        updateDeviceState(false, Collections.emptySet())
+    }
+
+    def bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
+        remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
+        deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
+        deviceContextProvider.createContextFromCapabilities(capabilities);
+        if (mountInstance != null && schemaContext.isPresent) {
+            mountInstance.schemaContext = schemaContext.get();
+        }
+
+        updateDeviceState(true, capabilities)
+
+        if (mountInstance != null) {
+            confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
+            operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
+            commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
+
+            val rpcs = new ArrayList<RpcRegistration>();
+            for (rpc : mountInstance.schemaContext.operations) {
+                rpcs.add(mountInstance.addRpcImplementation(rpc.QName, this));
             }
-        ]
+            rpcReg = rpcs
+        }
     }
 
-    private def updateDeviceState() {
+    private def updateDeviceState(boolean up, Set<QName> capabilities) {
         val transaction = dataBroker.beginTransaction
 
         val it = ImmutableCompositeNode.builder
         setQName(INVENTORY_NODE)
         addLeaf(INVENTORY_ID, name)
-        addLeaf(INVENTORY_CONNECTED, client.clientSession.up)
+        addLeaf(INVENTORY_CONNECTED, up)
 
-        logger.debug("Client capabilities {}", client.capabilities)
-        for (capability : client.capabilities) {
+        logger.debug("Client capabilities {}", capabilities)
+        for (capability : capabilities) {
             addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
         }
 
         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
+        transaction.removeOperationalData(path)
         transaction.putOperationalData(path, it.toInstance)
         logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
+
+        // FIXME: this has to be asynchronous
         val transactionStatus = transaction.commit.get;
 
         if (transactionStatus.successful) {
@@ -219,29 +220,8 @@ AutoCloseable {
         Collections.emptySet;
     }
 
-//    def createSubscription(String streamName) {
-//        val it = ImmutableCompositeNode.builder()
-//        QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
-//        addLeaf("stream", streamName);
-//        invokeRpc(QName, toInstance())
-//    }
-
     override invokeRpc(QName rpc, CompositeNode input) {
-        try {
-            val message = rpc.toRpcMessage(input,schemaContext);
-            val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
-            return Futures.immediateFuture(result.toRpcResult(rpc, schemaContext));
-        } catch (Exception e) {
-            logger.error("Rpc was not processed correctly.", e)
-            throw e;
-        }
-    }
-
-    def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) {
-        logger.debug("Send message {}",XmlUtil.toString(message.document))
-        val result = client.sendMessage(message, retryCount, timeout);
-        NetconfMapping.checkValidReply(message, result)
-        return result;
+        return listener.sendRequest(rpc.toRpcMessage(input,schemaContext));
     }
 
     override getProviderFunctionality() {
@@ -284,7 +264,7 @@ AutoCloseable {
                 return null;
             } else if (current instanceof CompositeNode) {
                 val currentComposite = (current as CompositeNode);
-                
+
                 current = currentComposite.getFirstCompositeByName(arg.nodeType);
                 if(current == null) {
                     current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
@@ -303,18 +283,13 @@ AutoCloseable {
     }
 
     override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
-        val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification);
+        val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification, true);
         twoPhaseCommit.prepare()
         return twoPhaseCommit;
     }
 
-    def getInitialCapabilities() {
-        val capabilities = client?.capabilities;
-        if (capabilities == null) {
-            return null;
-        }
-        if (cachedCapabilities == null) {
-            cachedCapabilities = FluentIterable.from(capabilities).filter[
+    def getCapabilities(Collection<String> capabilities) {
+        return FluentIterable.from(capabilities).filter[
                 contains("?") && contains("module=") && contains("revision=")].transform [
                 val parts = split("\\?");
                 val namespace = parts.get(0);
@@ -333,14 +308,10 @@ AutoCloseable {
                 }
                 return QName.create(namespace, revision, moduleName);
             ].toSet();
-        }
-        return cachedCapabilities;
     }
 
     override close() {
-        confReaderReg?.close()
-        operReaderReg?.close()
-        client?.close()
+        bringDown()
     }
 }