X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDeviceListener.java;h=d5e1d35d7d5721e06b9b515822813b54feac779f;hp=13cd5dbcf03a185ce99a5631684696bf74ef3ab5;hb=1d159b16ac23b4935ed6a0df683558ab42cd6f2c;hpb=ced3f70572c426af15515ee27b979ff05c26e1aa diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java index 13cd5dbcf0..d5e1d35d7d 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java @@ -7,48 +7,206 @@ */ package org.opendaylight.controller.sal.connect.netconf; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Queue; +import java.util.Set; + import org.opendaylight.controller.netconf.api.NetconfMessage; -import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener; +import org.opendaylight.controller.netconf.api.NetconfTerminationReason; import org.opendaylight.controller.netconf.client.NetconfClientSession; +import org.opendaylight.controller.netconf.client.NetconfClientSessionListener; +import org.opendaylight.controller.netconf.util.xml.XmlElement; +import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; +import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider; +import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +class NetconfDeviceListener implements NetconfClientSessionListener { + private static final class Request { + final UncancellableFuture> future; + final NetconfMessage request; + + private Request(UncancellableFuture> future, NetconfMessage request) { + this.future = future; + this.request = request; + } + } -class NetconfDeviceListener extends AbstractNetconfClientNotifySessionListener { + private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceListener.class); + private final Queue requests = new ArrayDeque<>(); private final NetconfDevice device; + private NetconfClientSession session; public NetconfDeviceListener(final NetconfDevice device) { this.device = Preconditions.checkNotNull(device); } - /** - * Method intended to customize notification processing. - * - * @param session - * {@see - * NetconfClientSessionListener#onMessage(NetconfClientSession, - * NetconfMessage)} - * @param message - * {@see - * NetconfClientSessionListener#onMessage(NetconfClientSession, - * NetconfMessage)} - */ @Override - public void onNotification(final NetconfClientSession session, final NetconfMessage message) { - this.device.logger.debug("Received NETCONF notification.", message); - CompositeNode domNotification = null; - if (message != null) { - domNotification = NetconfMapping.toNotificationNode(message, device.getSchemaContext()); - } - if (domNotification != null) { - MountProvisionInstance _mountInstance = null; - if (this.device != null) { - _mountInstance = this.device.getMountInstance(); + public synchronized void onSessionUp(final NetconfClientSession session) { + LOG.debug("Session with {} established as address {} session-id {}", + device.getName(), device.getSocketAddress(), session.getSessionId()); + + final Set caps = device.getCapabilities(session.getServerCapabilities()); + LOG.trace("Server {} advertized capabilities {}", device.getName(), caps); + + // Select the appropriate provider + final SchemaSourceProvider delegate; + if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) { + delegate = new NetconfRemoteSchemaSourceProvider(device); + } else if(caps.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) { + delegate = new NetconfRemoteSchemaSourceProvider(device); + } else { + LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName()); + delegate = SchemaSourceProviders.noopProvider(); + } + + device.bringUp(delegate, caps); + + this.session = session; + } + + private synchronized void tearDown(final Exception e) { + session = null; + + /* + * Walk all requests, check if they have been executing + * or cancelled and remove them from the queue. + */ + final Iterator it = requests.iterator(); + while (it.hasNext()) { + final Request r = it.next(); + if (r.future.isUncancellable()) { + // FIXME: add a RpcResult instead? + r.future.setException(e); + it.remove(); + } else if (r.future.isCancelled()) { + // This just does some house-cleaning + it.remove(); } - if (_mountInstance != null) { - _mountInstance.publish(domNotification); + } + + device.bringDown(); + } + + @Override + public void onSessionDown(final NetconfClientSession session, final Exception e) { + LOG.debug("Session with {} went down", device.getName(), e); + tearDown(e); + } + + @Override + public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) { + LOG.debug("Session with {} terminated {}", session, reason); + tearDown(new RuntimeException(reason.getErrorMessage())); + } + + @Override + public void onMessage(final NetconfClientSession session, final NetconfMessage message) { + /* + * Dispatch between notifications and messages. Messages need to be processed + * with lock held, notifications do not. + */ + if (isNotification(message)) { + processNotification(message); + } else { + processMessage(message); + } + } + + private synchronized void processMessage(final NetconfMessage message) { + final Request r = requests.peek(); + if (r.future.isUncancellable()) { + requests.poll(); + LOG.debug("Matched {} to {}", r.request, message); + + // FIXME: this can throw exceptions, which should result + // in the future failing + NetconfMapping.checkValidReply(r.request, message); + r.future.set(Rpcs.getRpcResult(true, NetconfMapping.toNotificationNode(message, device.getSchemaContext()), + Collections.emptyList())); + } else { + LOG.warn("Ignoring unsolicited message", message); + } + } + + synchronized ListenableFuture> sendRequest(final NetconfMessage message) { + if (session == null) { + LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message); + return Futures.>immediateFuture(new RpcResult() { + @Override + public boolean isSuccessful() { + return false; + } + + @Override + public CompositeNode getResult() { + return null; + } + + @Override + public Collection getErrors() { + // FIXME: indicate that the session is down + return Collections.emptySet(); + } + }); + } + + final Request req = new Request(new UncancellableFuture>(true), message); + requests.add(req); + + session.sendMessage(req.request).addListener(new FutureListener() { + @Override + public void operationComplete(final Future future) throws Exception { + if (!future.isSuccess()) { + // We expect that a session down will occur at this point + LOG.debug("Failed to send request {}", req.request, future.cause()); + req.future.setException(future.cause()); + } else { + LOG.trace("Finished sending request {}", req.request); + } } + }); + + return req.future; + } + + /** + * Process an incoming notification. + * + * @param notification Notification message + */ + private void processNotification(final NetconfMessage notification) { + this.device.logger.debug("Received NETCONF notification.", notification); + CompositeNode domNotification = NetconfMapping.toNotificationNode(notification, device.getSchemaContext()); + if (domNotification == null) { + return; + } + + MountProvisionInstance mountInstance = this.device.getMountInstance(); + if (mountInstance != null) { + mountInstance.publish(domNotification); } } + + private static boolean isNotification(final NetconfMessage message) { + final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument()); + return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ; + } }