From 1d159b16ac23b4935ed6a0df683558ab42cd6f2c Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 11 Feb 2014 08:44:46 +0100 Subject: [PATCH] BUG-372: Rework sal-netconf-connector This makes the connector truly asynchronous and able to work with backing datastore going away. It will retry connecting infintely to the backed device, too. Change-Id: I843620bf63eeade231698c22592aaec0521a09ad Signed-off-by: Robert Varga --- .../sal/connect/netconf/NetconfDevice.xtend | 159 ++++++------- .../netconf/NetconfDeviceListener.java | 210 +++++++++++++++--- ...etconfDeviceTwoPhaseCommitTransaction.java | 17 +- .../sal/connect/netconf/NetconfMapping.xtend | 47 ++-- .../NetconfRemoteSchemaSourceProvider.java | 10 +- .../connect/netconf/UncancellableFuture.java | 57 +++++ .../netconf/YangModelInputStreamAdapter.java | 104 --------- .../netconf/api/NetconfTerminationReason.java | 5 + 8 files changed, 346 insertions(+), 263 deletions(-) create mode 100644 opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/UncancellableFuture.java delete mode 100644 opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/YangModelInputStreamAdapter.java diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend index 3eb0472b5c..12e8b5587c 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend @@ -13,19 +13,18 @@ import io.netty.util.concurrent.EventExecutor import java.io.InputStream import java.net.InetSocketAddress import java.net.URI +import java.util.ArrayList +import java.util.Collection import java.util.Collections import java.util.List import java.util.Set import java.util.concurrent.ExecutorService -import java.util.concurrent.Future import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler import org.opendaylight.controller.md.sal.common.api.data.DataModification import org.opendaylight.controller.md.sal.common.api.data.DataReader -import org.opendaylight.controller.netconf.api.NetconfMessage -import org.opendaylight.controller.netconf.client.NetconfClient import org.opendaylight.controller.netconf.client.NetconfClientDispatcher -import org.opendaylight.controller.netconf.util.xml.XmlUtil import org.opendaylight.controller.sal.core.api.Broker.ProviderSession +import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration import org.opendaylight.controller.sal.core.api.Provider import org.opendaylight.controller.sal.core.api.RpcImplementation import org.opendaylight.controller.sal.core.api.data.DataBrokerService @@ -45,7 +44,6 @@ import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl import org.opendaylight.yangtools.yang.model.api.SchemaContext import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider -import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext import org.slf4j.Logger @@ -55,16 +53,13 @@ import static com.google.common.base.Preconditions.* import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.* import static extension org.opendaylight.controller.sal.connect.netconf.NetconfMapping.* -import com.google.common.util.concurrent.Futures -class NetconfDevice implements Provider, // +class NetconfDevice implements Provider, // DataReader, // DataCommitHandler, // RpcImplementation, // AutoCloseable { - var NetconfClient client; - @Property var InetSocketAddress socketAddress; @@ -94,15 +89,12 @@ AutoCloseable { Registration> operReaderReg Registration> confReaderReg Registration> commitHandlerReg + List rpcReg + @Property val String name - MountProvisionService mountService - - int messegeRetryCount = 5; - - int messageTimeoutCount = 5 * 1000; - Set cachedCapabilities + MountProvisionService mountService @Property var NetconfClientDispatcher dispatcher @@ -111,11 +103,13 @@ AutoCloseable { @Property var SchemaSourceProvider remoteSourceProvider - + DataBrokerService dataBroker + var NetconfDeviceListener listener; + public new(String name) { - this.name = name; + this._name = name; this.logger = LoggerFactory.getLogger(NetconfDevice.name + "#" + name); this.path = InstanceIdentifier.builder(INVENTORY_PATH).nodeWithKey(INVENTORY_NODE, Collections.singletonMap(INVENTORY_ID, name)).toInstance; @@ -126,10 +120,11 @@ AutoCloseable { checkState(schemaSourceProvider != null, "Schema Source Provider must be set.") checkState(eventExecutor != null, "Event executor must be set."); - val listener = new NetconfDeviceListener(this); - val task = startClientTask(dispatcher, listener) - return processingExecutor.submit(task) as Future; + listener = new NetconfDeviceListener(this); + logger.info("Starting NETCONF Client {} for address {}", name, socketAddress); + + dispatcher.createClient(socketAddress, listener, reconnectStrategy); } def Optional getSchemaContext() { @@ -139,59 +134,65 @@ AutoCloseable { return deviceContextProvider.currentContext; } - private def Runnable startClientTask(NetconfClientDispatcher dispatcher, NetconfDeviceListener listener) { - return [ | - try { - logger.info("Starting Netconf Client on: {}", socketAddress); - client = NetconfClient.clientFor(name, socketAddress, reconnectStrategy, dispatcher, listener); - logger.debug("Initial capabilities {}", initialCapabilities); - var SchemaSourceProvider delegate; - if (NetconfRemoteSchemaSourceProvider.isSupportedFor(initialCapabilities)) { - delegate = new NetconfRemoteSchemaSourceProvider(this); - } else if(client.capabilities.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.namespace.toString)) { - delegate = new NetconfRemoteSchemaSourceProvider(this); - } else { - logger.info("Netconf server {} does not support IETF Netconf Monitoring", socketAddress); - delegate = SchemaSourceProviders.noopProvider(); - } - remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate); - deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider); - deviceContextProvider.createContextFromCapabilities(initialCapabilities); - if (mountInstance != null && schemaContext.isPresent) { - mountInstance.schemaContext = schemaContext.get(); - val operations = schemaContext.get().operations; - for (rpc : operations) { - mountInstance.addRpcImplementation(rpc.QName, this); - } - } - updateDeviceState() - if (mountInstance != null && confReaderReg == null && operReaderReg == null) { - confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this); - operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this); - commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this); - } - } catch (Exception e) { - logger.error("Netconf client NOT started. ", e) + def bringDown() { + if (rpcReg != null) { + for (reg : rpcReg) { + reg.close() + } + rpcReg = null + } + confReaderReg?.close() + confReaderReg = null + operReaderReg?.close() + operReaderReg = null + commitHandlerReg?.close() + commitHandlerReg = null + + updateDeviceState(false, Collections.emptySet()) + } + + def bringUp(SchemaSourceProvider delegate, Set capabilities) { + remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate); + deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider); + deviceContextProvider.createContextFromCapabilities(capabilities); + if (mountInstance != null && schemaContext.isPresent) { + mountInstance.schemaContext = schemaContext.get(); + } + + updateDeviceState(true, capabilities) + + if (mountInstance != null) { + confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this); + operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this); + commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this); + + val rpcs = new ArrayList(); + for (rpc : mountInstance.schemaContext.operations) { + rpcs.add(mountInstance.addRpcImplementation(rpc.QName, this)); } - ] + rpcReg = rpcs + } } - private def updateDeviceState() { + private def updateDeviceState(boolean up, Set capabilities) { val transaction = dataBroker.beginTransaction val it = ImmutableCompositeNode.builder setQName(INVENTORY_NODE) addLeaf(INVENTORY_ID, name) - addLeaf(INVENTORY_CONNECTED, client.clientSession.up) + addLeaf(INVENTORY_CONNECTED, up) - logger.debug("Client capabilities {}", client.capabilities) - for (capability : client.capabilities) { + logger.debug("Client capabilities {}", capabilities) + for (capability : capabilities) { addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability) } logger.debug("Update device state transaction " + transaction.identifier + " putting operational data started.") + transaction.removeOperationalData(path) transaction.putOperationalData(path, it.toInstance) logger.debug("Update device state transaction " + transaction.identifier + " putting operational data ended.") + + // FIXME: this has to be asynchronous val transactionStatus = transaction.commit.get; if (transactionStatus.successful) { @@ -219,29 +220,8 @@ AutoCloseable { Collections.emptySet; } -// def createSubscription(String streamName) { -// val it = ImmutableCompositeNode.builder() -// QName = NETCONF_CREATE_SUBSCRIPTION_QNAME -// addLeaf("stream", streamName); -// invokeRpc(QName, toInstance()) -// } - override invokeRpc(QName rpc, CompositeNode input) { - try { - val message = rpc.toRpcMessage(input,schemaContext); - val result = sendMessageImpl(message, messegeRetryCount, messageTimeoutCount); - return Futures.immediateFuture(result.toRpcResult(rpc, schemaContext)); - } catch (Exception e) { - logger.error("Rpc was not processed correctly.", e) - throw e; - } - } - - def NetconfMessage sendMessageImpl(NetconfMessage message, int retryCount, int timeout) { - logger.debug("Send message {}",XmlUtil.toString(message.document)) - val result = client.sendMessage(message, retryCount, timeout); - NetconfMapping.checkValidReply(message, result) - return result; + return listener.sendRequest(rpc.toRpcMessage(input,schemaContext)); } override getProviderFunctionality() { @@ -284,7 +264,7 @@ AutoCloseable { return null; } else if (current instanceof CompositeNode) { val currentComposite = (current as CompositeNode); - + current = currentComposite.getFirstCompositeByName(arg.nodeType); if(current == null) { current = currentComposite.getFirstCompositeByName(arg.nodeType.withoutRevision()); @@ -303,18 +283,13 @@ AutoCloseable { } override requestCommit(DataModification modification) { - val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification); + val twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, modification, true); twoPhaseCommit.prepare() return twoPhaseCommit; } - def getInitialCapabilities() { - val capabilities = client?.capabilities; - if (capabilities == null) { - return null; - } - if (cachedCapabilities == null) { - cachedCapabilities = FluentIterable.from(capabilities).filter[ + def getCapabilities(Collection capabilities) { + return FluentIterable.from(capabilities).filter[ contains("?") && contains("module=") && contains("revision=")].transform [ val parts = split("\\?"); val namespace = parts.get(0); @@ -333,14 +308,10 @@ AutoCloseable { } return QName.create(namespace, revision, moduleName); ].toSet(); - } - return cachedCapabilities; } override close() { - confReaderReg?.close() - operReaderReg?.close() - client?.close() + bringDown() } } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java index 13cd5dbcf0..d5e1d35d7d 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java @@ -7,48 +7,206 @@ */ package org.opendaylight.controller.sal.connect.netconf; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Queue; +import java.util.Set; + import org.opendaylight.controller.netconf.api.NetconfMessage; -import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener; +import org.opendaylight.controller.netconf.api.NetconfTerminationReason; import org.opendaylight.controller.netconf.client.NetconfClientSession; +import org.opendaylight.controller.netconf.client.NetconfClientSessionListener; +import org.opendaylight.controller.netconf.util.xml.XmlElement; +import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; +import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider; +import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +class NetconfDeviceListener implements NetconfClientSessionListener { + private static final class Request { + final UncancellableFuture> future; + final NetconfMessage request; + + private Request(UncancellableFuture> future, NetconfMessage request) { + this.future = future; + this.request = request; + } + } -class NetconfDeviceListener extends AbstractNetconfClientNotifySessionListener { + private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceListener.class); + private final Queue requests = new ArrayDeque<>(); private final NetconfDevice device; + private NetconfClientSession session; public NetconfDeviceListener(final NetconfDevice device) { this.device = Preconditions.checkNotNull(device); } - /** - * Method intended to customize notification processing. - * - * @param session - * {@see - * NetconfClientSessionListener#onMessage(NetconfClientSession, - * NetconfMessage)} - * @param message - * {@see - * NetconfClientSessionListener#onMessage(NetconfClientSession, - * NetconfMessage)} - */ @Override - public void onNotification(final NetconfClientSession session, final NetconfMessage message) { - this.device.logger.debug("Received NETCONF notification.", message); - CompositeNode domNotification = null; - if (message != null) { - domNotification = NetconfMapping.toNotificationNode(message, device.getSchemaContext()); - } - if (domNotification != null) { - MountProvisionInstance _mountInstance = null; - if (this.device != null) { - _mountInstance = this.device.getMountInstance(); + public synchronized void onSessionUp(final NetconfClientSession session) { + LOG.debug("Session with {} established as address {} session-id {}", + device.getName(), device.getSocketAddress(), session.getSessionId()); + + final Set caps = device.getCapabilities(session.getServerCapabilities()); + LOG.trace("Server {} advertized capabilities {}", device.getName(), caps); + + // Select the appropriate provider + final SchemaSourceProvider delegate; + if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) { + delegate = new NetconfRemoteSchemaSourceProvider(device); + } else if(caps.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) { + delegate = new NetconfRemoteSchemaSourceProvider(device); + } else { + LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName()); + delegate = SchemaSourceProviders.noopProvider(); + } + + device.bringUp(delegate, caps); + + this.session = session; + } + + private synchronized void tearDown(final Exception e) { + session = null; + + /* + * Walk all requests, check if they have been executing + * or cancelled and remove them from the queue. + */ + final Iterator it = requests.iterator(); + while (it.hasNext()) { + final Request r = it.next(); + if (r.future.isUncancellable()) { + // FIXME: add a RpcResult instead? + r.future.setException(e); + it.remove(); + } else if (r.future.isCancelled()) { + // This just does some house-cleaning + it.remove(); } - if (_mountInstance != null) { - _mountInstance.publish(domNotification); + } + + device.bringDown(); + } + + @Override + public void onSessionDown(final NetconfClientSession session, final Exception e) { + LOG.debug("Session with {} went down", device.getName(), e); + tearDown(e); + } + + @Override + public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) { + LOG.debug("Session with {} terminated {}", session, reason); + tearDown(new RuntimeException(reason.getErrorMessage())); + } + + @Override + public void onMessage(final NetconfClientSession session, final NetconfMessage message) { + /* + * Dispatch between notifications and messages. Messages need to be processed + * with lock held, notifications do not. + */ + if (isNotification(message)) { + processNotification(message); + } else { + processMessage(message); + } + } + + private synchronized void processMessage(final NetconfMessage message) { + final Request r = requests.peek(); + if (r.future.isUncancellable()) { + requests.poll(); + LOG.debug("Matched {} to {}", r.request, message); + + // FIXME: this can throw exceptions, which should result + // in the future failing + NetconfMapping.checkValidReply(r.request, message); + r.future.set(Rpcs.getRpcResult(true, NetconfMapping.toNotificationNode(message, device.getSchemaContext()), + Collections.emptyList())); + } else { + LOG.warn("Ignoring unsolicited message", message); + } + } + + synchronized ListenableFuture> sendRequest(final NetconfMessage message) { + if (session == null) { + LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message); + return Futures.>immediateFuture(new RpcResult() { + @Override + public boolean isSuccessful() { + return false; + } + + @Override + public CompositeNode getResult() { + return null; + } + + @Override + public Collection getErrors() { + // FIXME: indicate that the session is down + return Collections.emptySet(); + } + }); + } + + final Request req = new Request(new UncancellableFuture>(true), message); + requests.add(req); + + session.sendMessage(req.request).addListener(new FutureListener() { + @Override + public void operationComplete(final Future future) throws Exception { + if (!future.isSuccess()) { + // We expect that a session down will occur at this point + LOG.debug("Failed to send request {}", req.request, future.cause()); + req.future.setException(future.cause()); + } else { + LOG.trace("Finished sending request {}", req.request); + } } + }); + + return req.future; + } + + /** + * Process an incoming notification. + * + * @param notification Notification message + */ + private void processNotification(final NetconfMessage notification) { + this.device.logger.debug("Received NETCONF notification.", notification); + CompositeNode domNotification = NetconfMapping.toNotificationNode(notification, device.getSchemaContext()); + if (domNotification == null) { + return; + } + + MountProvisionInstance mountInstance = this.device.getMountInstance(); + if (mountInstance != null) { + mountInstance.publish(domNotification); } } + + private static boolean isNotification(final NetconfMessage message) { + final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument()); + return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ; + } } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTwoPhaseCommitTransaction.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTwoPhaseCommitTransaction.java index 9ec3aa3bb0..5f14c264ed 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTwoPhaseCommitTransaction.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTwoPhaseCommitTransaction.java @@ -42,17 +42,18 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction { +class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransaction { private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceTwoPhaseCommitTransaction.class); - private final NetconfDevice device; private final DataModification modification; - private final boolean candidateSupported = true; + private final NetconfDevice device; + private final boolean candidateSupported; public NetconfDeviceTwoPhaseCommitTransaction(NetconfDevice device, - DataModification modification) { - super(); - this.device = device; - this.modification = modification; + DataModification modification, + boolean candidateSupported) { + this.device = Preconditions.checkNotNull(device); + this.modification = Preconditions.checkNotNull(modification); + this.candidateSupported = candidateSupported; } void prepare() throws InterruptedException, ExecutionException { @@ -62,7 +63,6 @@ public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransac for(Entry toUpdate : modification.getUpdatedConfigurationData().entrySet()) { sendMerge(toUpdate.getKey(),toUpdate.getValue()); } - } private void sendMerge(InstanceIdentifier key, CompositeNode value) throws InterruptedException, ExecutionException { @@ -80,7 +80,6 @@ public class NetconfDeviceTwoPhaseCommitTransaction implements DataCommitTransac RpcResult rpcResult = device.invokeRpc(NETCONF_EDIT_CONFIG_QNAME, builder.toInstance()).get(); Preconditions.checkState(rpcResult.isSuccessful(),"Rpc Result was unsuccessful"); - } private CompositeNodeBuilder configurationRpcBuilder() { diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfMapping.xtend b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfMapping.xtend index e5a24fcf63..228a01eb4c 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfMapping.xtend +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfMapping.xtend @@ -39,8 +39,8 @@ class NetconfMapping { public static val NETCONF_URI = URI.create("urn:ietf:params:xml:ns:netconf:base:1.0") public static val NETCONF_MONITORING_URI = "urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring" public static val NETCONF_NOTIFICATION_URI = URI.create("urn:ietf:params:xml:ns:netconf:notification:1.0") - - + + public static val NETCONF_QNAME = QName.create(NETCONF_URI, null, "netconf"); public static val NETCONF_RPC_QNAME = QName.create(NETCONF_QNAME, "rpc"); public static val NETCONF_GET_QNAME = QName.create(NETCONF_QNAME, "get"); @@ -51,15 +51,15 @@ class NetconfMapping { public static val NETCONF_DELETE_CONFIG_QNAME = QName.create(NETCONF_QNAME, "delete-config"); public static val NETCONF_OPERATION_QNAME = QName.create(NETCONF_QNAME, "operation"); public static val NETCONF_COMMIT_QNAME = QName.create(NETCONF_QNAME, "commit"); - + public static val NETCONF_CONFIG_QNAME = QName.create(NETCONF_QNAME, "config"); public static val NETCONF_SOURCE_QNAME = QName.create(NETCONF_QNAME, "source"); public static val NETCONF_TARGET_QNAME = QName.create(NETCONF_QNAME, "target"); - + public static val NETCONF_CANDIDATE_QNAME = QName.create(NETCONF_QNAME, "candidate"); public static val NETCONF_RUNNING_QNAME = QName.create(NETCONF_QNAME, "running"); - - + + public static val NETCONF_RPC_REPLY_QNAME = QName.create(NETCONF_QNAME, "rpc-reply"); public static val NETCONF_OK_QNAME = QName.create(NETCONF_QNAME, "ok"); public static val NETCONF_DATA_QNAME = QName.create(NETCONF_QNAME, "data"); @@ -78,7 +78,7 @@ class NetconfMapping { if(identifier.path.empty) { return null; } - + for (component : identifier.path.reverseView) { val Node current = component.toNode(previous); previous = current; @@ -106,11 +106,11 @@ class NetconfMapping { } static def CompositeNode toCompositeNode(NetconfMessage message,Optional ctx) { - //TODO: implement general normalization to normalize incoming Netconf Message + //TODO: implement general normalization to normalize incoming Netconf Message // for Schema Context counterpart return null } - + static def CompositeNode toNotificationNode(NetconfMessage message,Optional ctx) { if (ctx.present) { val schemaContext = ctx.get @@ -127,56 +127,53 @@ class NetconfMapping { w3cPayload.documentElement.setAttribute("message-id", "m-" + messageId.andIncrement) return new NetconfMessage(w3cPayload); } - + def static flattenInput(CompositeNode node) { val inputQName = QName.create(node.nodeType,"input"); val input = node.getFirstCompositeByName(inputQName); if(input == null) return node; if(input instanceof CompositeNode) { - + val nodes = ImmutableList.builder() // .addAll(input.children) // .addAll(node.children.filter[nodeType != inputQName]) // .build() return ImmutableCompositeNode.create(node.nodeType,nodes); - } - + } + } static def RpcResult toRpcResult(NetconfMessage message,QName rpc,Optional context) { var CompositeNode rawRpc; if(context.present) { if(isDataRetrievalReply(rpc)) { - + val xmlData = message.document.dataSubtree val dataNodes = XmlDocumentUtils.toDomNodes(xmlData, Optional.of(context.get.dataDefinitions)) - + val it = ImmutableCompositeNode.builder() setQName(NETCONF_RPC_REPLY_QNAME) add(ImmutableCompositeNode.create(NETCONF_DATA_QNAME, dataNodes)); - + rawRpc = it.toInstance; //sys(xmlData) } else { val rpcSchema = context.get.operations.findFirst[QName == rpc] rawRpc = message.document.toCompositeNode() as CompositeNode; } - - - } else { rawRpc = message.document.toCompositeNode() as CompositeNode; } //rawRpc. return Rpcs.getRpcResult(true, rawRpc, Collections.emptySet()); } - + def static Element getDataSubtree(Document doc) { doc.getElementsByTagNameNS(NETCONF_URI.toString,"data").item(0) as Element } - + def static boolean isDataRetrievalReply(QName it) { - return NETCONF_URI == namespace && ( localName == NETCONF_GET_CONFIG_QNAME.localName || localName == NETCONF_GET_QNAME.localName) + return NETCONF_URI == namespace && ( localName == NETCONF_GET_CONFIG_QNAME.localName || localName == NETCONF_GET_QNAME.localName) } static def wrap(QName name, Node node) { @@ -209,12 +206,12 @@ class NetconfMapping { public static def Node toCompositeNode(Document document) { return XmlDocumentUtils.toDomNode(document) as Node } - + public static def checkValidReply(NetconfMessage input, NetconfMessage output) { val inputMsgId = input.document.documentElement.getAttribute("message-id") val outputMsgId = output.document.documentElement.getAttribute("message-id") Preconditions.checkState(inputMsgId == outputMsgId,"Rpc request and reply message IDs must be same."); - + } - + } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfRemoteSchemaSourceProvider.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfRemoteSchemaSourceProvider.java index 1932726600..c734e80d9a 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfRemoteSchemaSourceProvider.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfRemoteSchemaSourceProvider.java @@ -7,7 +7,7 @@ */ package org.opendaylight.controller.sal.connect.netconf; -import java.util.Set; +import java.util.Collection; import java.util.concurrent.ExecutionException; import org.opendaylight.yangtools.yang.common.QName; @@ -19,6 +19,7 @@ import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder; import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; class NetconfRemoteSchemaSourceProvider implements SchemaSourceProvider { @@ -27,11 +28,10 @@ class NetconfRemoteSchemaSourceProvider implements SchemaSourceProvider public static final QName GET_SCHEMA_QNAME = QName.create(IETF_NETCONF_MONITORING, "get-schema"); public static final QName GET_DATA_QNAME = QName.create(IETF_NETCONF_MONITORING, "data"); - NetconfDevice device; + private final NetconfDevice device; public NetconfRemoteSchemaSourceProvider(NetconfDevice device) { - super(); - this.device = device; + this.device = Preconditions.checkNotNull(device); } @Override @@ -73,7 +73,7 @@ class NetconfRemoteSchemaSourceProvider implements SchemaSourceProvider return null; } - public static final boolean isSupportedFor(Set capabilities) { + public static final boolean isSupportedFor(Collection capabilities) { return capabilities.contains(IETF_NETCONF_MONITORING); } } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/UncancellableFuture.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/UncancellableFuture.java new file mode 100644 index 0000000000..c353f86eb6 --- /dev/null +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/UncancellableFuture.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.connect.netconf; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractFuture; + +final class UncancellableFuture extends AbstractFuture { + @GuardedBy("this") + private boolean uncancellable = false; + + public UncancellableFuture(boolean uncancellable) { + this.uncancellable = uncancellable; + } + + public synchronized boolean setUncancellable() { + if (isCancelled()) { + return false; + } + + uncancellable = true; + return true; + } + + public synchronized boolean isUncancellable() { + return uncancellable; + } + + @Override + public synchronized boolean cancel(boolean mayInterruptIfRunning) { + if (uncancellable) { + return false; + } + + return super.cancel(mayInterruptIfRunning); + } + + @Override + public synchronized boolean set(@Nullable V value) { + Preconditions.checkState(uncancellable == true); + return super.set(value); + } + + @Override + protected boolean setException(Throwable throwable) { + Preconditions.checkState(uncancellable == true); + return super.setException(throwable); + } +} diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/YangModelInputStreamAdapter.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/YangModelInputStreamAdapter.java deleted file mode 100644 index 23892e18bd..0000000000 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/YangModelInputStreamAdapter.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.sal.connect.netconf; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; - -import org.opendaylight.yangtools.concepts.Delegator; -import org.opendaylight.yangtools.yang.common.QName; - -import com.google.common.base.Charsets; - -/** - * - * - */ -public class YangModelInputStreamAdapter extends InputStream implements Delegator { - - final String source; - final QName moduleIdentifier; - final InputStream delegate; - - private YangModelInputStreamAdapter(String source, QName moduleIdentifier, InputStream delegate) { - super(); - this.source = source; - this.moduleIdentifier = moduleIdentifier; - this.delegate = delegate; - } - - @Override - public int read() throws IOException { - return delegate.read(); - } - - @Override - public int hashCode() { - return delegate.hashCode(); - } - - @Override - public int read(byte[] b) throws IOException { - return delegate.read(b); - } - - @Override - public boolean equals(Object obj) { - return delegate.equals(obj); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return delegate.read(b, off, len); - } - - @Override - public long skip(long n) throws IOException { - return delegate.skip(n); - } - - @Override - public int available() throws IOException { - return delegate.available(); - } - - @Override - public void close() throws IOException { - delegate.close(); - } - - @Override - public void mark(int readlimit) { - delegate.mark(readlimit); - } - - @Override - public void reset() throws IOException { - delegate.reset(); - } - - @Override - public boolean markSupported() { - return delegate.markSupported(); - } - - @Override - public InputStream getDelegate() { - return delegate; - } - - @Override - public String toString() { - return "YangModelInputStreamAdapter [moduleIdentifier=" + moduleIdentifier + ", delegate=" + delegate + "]"; - } - - public static YangModelInputStreamAdapter create(QName name, String module) { - return new YangModelInputStreamAdapter(null, name, new ByteArrayInputStream(module.getBytes(Charsets.UTF_8))); - } -} diff --git a/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfTerminationReason.java b/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfTerminationReason.java index 9de3071060..a15f9e0925 100644 --- a/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfTerminationReason.java +++ b/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfTerminationReason.java @@ -22,4 +22,9 @@ public class NetconfTerminationReason implements TerminationReason { public String getErrorMessage() { return reason; } + + @Override + public String toString() { + return reason; + } } -- 2.36.6