+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.sal.connect.netconf
import com.google.common.base.Optional
import java.io.InputStream
import java.net.InetSocketAddress
import java.net.URI
+import java.util.ArrayList
+import java.util.Collection
import java.util.Collections
import java.util.List
import java.util.Set
import java.util.concurrent.ExecutorService
-import java.util.concurrent.Future
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
import org.opendaylight.controller.md.sal.common.api.data.DataModification
import org.opendaylight.controller.md.sal.common.api.data.DataReader
-import org.opendaylight.controller.netconf.api.NetconfMessage
-import org.opendaylight.controller.netconf.client.NetconfClient
import org.opendaylight.controller.netconf.client.NetconfClientDispatcher
-import org.opendaylight.controller.netconf.util.xml.XmlUtil
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
import org.opendaylight.controller.sal.core.api.Provider
import org.opendaylight.controller.sal.core.api.RpcImplementation
import org.opendaylight.controller.sal.core.api.data.DataBrokerService
import org.opendaylight.yangtools.yang.model.api.SchemaContext
import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider
import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider
-import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl
import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext
import org.slf4j.Logger
import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.*
-class NetconfDevice implements Provider, //
+class NetconfDevice implements Provider, //
DataReader<InstanceIdentifier, CompositeNode>, //
DataCommitHandler<InstanceIdentifier, CompositeNode>, //
RpcImplementation, //
AutoCloseable {
- var NetconfClient client;
-
@Property
var InetSocketAddress socketAddress;
Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg
Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg
Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg
+ List<RpcRegistration> rpcReg
+ @Property
val String name
- MountProvisionService mountService
- int messegeRetryCount = 5;
-
- int messageTimeoutCount = 5 * 1000;
-
- Set<QName> cachedCapabilities
+ MountProvisionService mountService
@Property
var NetconfClientDispatcher dispatcher
@Property
var SchemaSourceProvider<InputStream> remoteSourceProvider
-
+
DataBrokerService dataBroker
+ var NetconfDeviceListener listener;
+
public new(String name) {
- this.name = name;
+ this._name = name;
this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name);
this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE,
Collections.singletonMap(INVENTORY_ID, name)).toInstance;
checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
checkState(eventExecutor != null, "Event executor must be set.");
- val listener = new NetconfDeviceListener(this, eventExecutor);
- val task = startClientTask(dispatcher, listener)
- if (mountInstance != null) {
- commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this)
- }
- return processingExecutor.submit(task) as Future<Void>;
+ listener = new NetconfDeviceListener(this);
+
+ logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
- //commitHandlerReg = mountInstance.registerCommitHandler(path,this);
+ dispatcher.createClient(socketAddress, listener, reconnectStrategy);
}
def Optional<SchemaContext> getSchemaContext() {
return deviceContextProvider.currentContext;
}
- private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) {
- return [ |
- try {
- logger.info("Starting Netconf Client on: {}", socketAddress);
- client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener);
- logger.debug("Initial capabilities {}", initialCapabilities);
- var SchemaSourceProvider<String> delegate;
- if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) {
- delegate = new NetconfRemoteSchemaSourceProvider(this);
- } else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) {
- delegate = new NetconfRemoteSchemaSourceProvider(this);
- } else {
- logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress);
- delegate = SchemaSourceProviders.<String>noopProvider();
- }
- remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
- deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
- deviceContextProvider.createContextFromCapabilities(initialCapabilities);
- if (mountInstance != null && schemaContext.isPresent) {
- mountInstance.schemaContext = schemaContext.get();
- }
- updateDeviceState()
- if (mountInstance != null && confReaderReg == null && operReaderReg == null) {
- confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
- operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
+ def bringDown() {
+ if (rpcReg != null) {
+ for (reg : rpcReg) {
+ reg.close()
+ }
+ rpcReg = null
+ }
+ confReaderReg?.close()
+ confReaderReg = null
+ operReaderReg?.close()
+ operReaderReg = null
+ commitHandlerReg?.close()
+ commitHandlerReg = null
+
+ updateDeviceState(false, Collections.emptySet())
+ }
+
+ def bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
+ remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
+ deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
+ deviceContextProvider.createContextFromCapabilities(capabilities);
+ if (mountInstance != null && schemaContext.isPresent) {
+ mountInstance.schemaContext = schemaContext.get();
+ }
+
+ updateDeviceState(true, capabilities)
+
+ if (mountInstance != null) {
+ confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
+ operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
+ commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
+
+ val rpcs = new ArrayList<RpcRegistration>();
+ if (mountInstance != null && schemaContext.isPresent) {
+ for (rpc : mountInstance.schemaContext.operations) {
+ rpcs.add(mountInstance.addRpcImplementation(rpc.QName, this));
}
- } catch (Exception e) {
- logger.error("Netconf client NOT started. ", e)
}
- ]
+ rpcReg = rpcs
+ }
}
- private def updateDeviceState() {
+ private def updateDeviceState(boolean up, Set<QName> capabilities) {
val transaction = dataBroker.beginTransaction
val it = ImmutableCompositeNode.builder
setQName(INVENTORY_NODE)
addLeaf(INVENTORY_ID, name)
- addLeaf(INVENTORY_CONNECTED, client.clientSession.up)
+ addLeaf(INVENTORY_CONNECTED, up)
- logger.debug("Client capabilities {}", client.capabilities)
- for (capability : client.capabilities) {
+ logger.debug("Client capabilities {}", capabilities)
+ for (capability : capabilities) {
addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability)
}
logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.")
+ transaction.removeOperationalData(path)
transaction.putOperationalData(path, it.toInstance)
logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.")
+
+ // FIXME: this has to be asynchronous
val transactionStatus = transaction.commit.get;
if (transactionStatus.successful) {
override readConfigurationData(InstanceIdentifier path) {
val result = invokeRpc(NETCONF_GET_CONFIG_QNAME,
- wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure()));
+ wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, path.toFilterStructure())).get();
val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
return data?.findNode(path) as CompositeNode;
}
override readOperationalData(InstanceIdentifier path) {
- val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure()));
+ val result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, path.toFilterStructure())).get();
val data = result.result.getFirstCompositeByName(NETCONF_DATA_QNAME);
return data?.findNode(path) as CompositeNode;
}
Collections.emptySet;
}
- def createSubscription(String streamName) {
- val it = ImmutableCompositeNode.builder()
- QName = NETCONF_CREATE_SUBSCRIPTION_QNAME
- addLeaf("stream", streamName);
- invokeRpc(QName, toInstance())
- }
-
override invokeRpc(QName rpc, CompositeNode input) {
- try {
- val message = rpc.toRpcMessage(input,schemaContext);
- val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount);
- return result.toRpcResult(rpc, schemaContext);
-
- } catch (Exception e) {
- logger.error("Rpc was not processed correctly.", e)
- throw e;
- }
- }
-
- def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) {
- logger.debug("Send message {}",XmlUtil.toString(message.document))
- val result = client.sendMessage(message, retryCount, timeout);
- NetconfMapping.checkValidReply(message, result)
- return result;
+ return listener.sendRequest(rpc.toRpcMessage(input,schemaContext));
}
override getProviderFunctionality() {
return null;
} else if (current instanceof CompositeNode) {
val currentComposite = (current as CompositeNode);
-
+
current = currentComposite.getFirstCompositeByName(arg.nodeType);
if(current == null) {
current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision());
}
override requestCommit(DataModification<InstanceIdentifier, CompositeNode> modification) {
- val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification);
+ val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification, true);
twoPhaseCommit.prepare()
return twoPhaseCommit;
}
- def getInitialCapabilities() {
- val capabilities = client?.capabilities;
- if (capabilities == null) {
- return null;
- }
- if (cachedCapabilities == null) {
- cachedCapabilities = FluentIterable.from(capabilities).filter[
+ def getCapabilities(Collection<String> capabilities) {
+ return FluentIterable.from(capabilities).filter[
contains("?") && contains("module=") && contains("revision=")].transform [
val parts = split("\\?");
val namespace = parts.get(0);
}
return QName.create(namespace, revision, moduleName);
].toSet();
- }
- return cachedCapabilities;
}
override close() {
- confReaderReg?.close()
- operReaderReg?.close()
- client?.close()
+ bringDown()
}
-
}
package class NetconfDeviceSchemaContextProvider {