X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDeviceListener.java;h=1dfc3b44d3191bf00427d9e5908ff0f68f7517e0;hp=69fe4aa1904e57ac83512400eaf6a3688a8da3ea;hb=84248dac9ed8aa37e996e39429c8aa8ece473eaf;hpb=24fa75eae25771889b94c316f55282c39795d166 diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java index 69fe4aa190..1dfc3b44d3 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java @@ -7,150 +7,232 @@ */ package org.opendaylight.controller.sal.connect.netconf; -import com.google.common.base.Objects; +import com.google.common.collect.Sets; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.Promise; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Queue; +import java.util.Set; -import java.util.List; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.locks.ReentrantLock; - -import org.eclipse.xtext.xbase.lib.Exceptions; -import org.eclipse.xtext.xbase.lib.Functions.Function0; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.NetconfMessage; +import org.opendaylight.controller.netconf.api.NetconfTerminationReason; import org.opendaylight.controller.netconf.client.NetconfClientSession; import org.opendaylight.controller.netconf.client.NetconfClientSessionListener; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; -import org.opendaylight.controller.sal.connect.netconf.NetconfDevice; -import org.opendaylight.controller.sal.connect.netconf.NetconfMapping; +import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.Node; -import org.w3c.dom.Document; +import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider; +import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +class NetconfDeviceListener implements NetconfClientSessionListener { + + private static final class Request { + final UncancellableFuture> future; + final NetconfMessage request; + final QName rpc; + + private Request(UncancellableFuture> future, NetconfMessage request, final QName rpc) { + this.future = future; + this.request = request; + this.rpc = rpc; + } + } -@SuppressWarnings("all") -class NetconfDeviceListener extends NetconfClientSessionListener { + private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceListener.class); + private final Queue requests = new ArrayDeque<>(); private final NetconfDevice device; - private final EventExecutor eventExecutor; + private NetconfClientSession session; - public NetconfDeviceListener(final NetconfDevice device, final EventExecutor eventExecutor) { - this.device = device; - this.eventExecutor = eventExecutor; + public NetconfDeviceListener(final NetconfDevice device) { + this.device = Preconditions.checkNotNull(device); } - private Promise messagePromise; - private ConcurrentMap> promisedMessages; + @Override + public synchronized void onSessionUp(final NetconfClientSession session) { + LOG.debug("Session with {} established as address {} session-id {}", + device.getName(), device.getSocketAddress(), session.getSessionId()); - private final ReentrantLock promiseLock = new ReentrantLock(); + this.session = session; - public void onMessage(final NetconfClientSession session, final NetconfMessage message) { - if (isNotification(message)) { - this.onNotification(session, message); + final Set caps = device.getCapabilities(session.getServerCapabilities()); + LOG.trace("Server {} advertized capabilities {}", device.getName(), caps); + + // Select the appropriate provider + final SchemaSourceProvider delegate; + if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) { + delegate = new NetconfRemoteSchemaSourceProvider(device); + // FIXME caps do not contain urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring, since it is filtered out in getCapabilitites + } else if(session.getServerCapabilities().contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) { + delegate = new NetconfRemoteSchemaSourceProvider(device); } else { - try { - this.promiseLock.lock(); - boolean _notEquals = (!Objects.equal(this.messagePromise, null)); - if (_notEquals) { - this.device.logger.debug("Setting promised reply {} with message {}", this.messagePromise, message); - this.messagePromise.setSuccess(message); - this.messagePromise = null; - } - } finally { - this.promiseLock.unlock(); - } + LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName()); + delegate = SchemaSourceProviders.noopProvider(); } + + device.bringUp(delegate, caps, isRollbackSupported(session.getServerCapabilities())); + } - /** - * Method intended to customize notification processing. - * - * @param session - * {@see - * NetconfClientSessionListener#onMessage(NetconfClientSession, - * NetconfMessage)} - * @param message - * {@see - * NetconfClientSessionListener#onMessage(NetconfClientSession, - * NetconfMessage)} - */ - public void onNotification(final NetconfClientSession session, final NetconfMessage message) { - this.device.logger.debug("Received NETCONF notification.", message); - CompositeNode domNotification = null; - if (message != null) { - domNotification = NetconfMapping.toNotificationNode(message, device.getSchemaContext()); - } - if (domNotification != null) { - MountProvisionInstance _mountInstance = null; - if (this.device != null) { - _mountInstance = this.device.getMountInstance(); - } - if (_mountInstance != null) { - _mountInstance.publish(domNotification); + private static boolean isRollbackSupported(final Collection serverCapabilities) { + // TODO rollback capability cannot be searched for in Set caps + // since this set does not contain module-less capabilities + return Sets.newHashSet(serverCapabilities).contains(NetconfMapping.NETCONF_ROLLBACK_ON_ERROR_URI.toString()); + } + + private synchronized void tearDown(final Exception e) { + session = null; + + /* + * Walk all requests, check if they have been executing + * or cancelled and remove them from the queue. + */ + final Iterator it = requests.iterator(); + while (it.hasNext()) { + final Request r = it.next(); + if (r.future.isUncancellable()) { + // FIXME: add a RpcResult instead? + r.future.setException(e); + it.remove(); + } else if (r.future.isCancelled()) { + // This just does some house-cleaning + it.remove(); } } + + device.bringDown(); } - private static CompositeNode getNotificationBody(final CompositeNode node) { - List> _children = node.getChildren(); - for (final Node child : _children) { - if ((child instanceof CompositeNode)) { - return ((CompositeNode) child); - } + @Override + public void onSessionDown(final NetconfClientSession session, final Exception e) { + LOG.debug("Session with {} went down", device.getName(), e); + tearDown(e); + } + + @Override + public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) { + LOG.debug("Session with {} terminated {}", session, reason); + tearDown(new RuntimeException(reason.getErrorMessage())); + } + + @Override + public void onMessage(final NetconfClientSession session, final NetconfMessage message) { + /* + * Dispatch between notifications and messages. Messages need to be processed + * with lock held, notifications do not. + */ + if (isNotification(message)) { + processNotification(message); + } else { + processMessage(message); } - return null; } - public NetconfMessage getLastMessage(final int attempts, final int attemptMsDelay) throws InterruptedException { - final Promise promise = this.promiseReply(); - this.device.logger.debug("Waiting for reply {}", promise); - int _plus = (attempts * attemptMsDelay); - final boolean messageAvailable = promise.await(_plus); - if (messageAvailable) { + private synchronized void processMessage(final NetconfMessage message) { + final Request r = requests.peek(); + if (r.future.isUncancellable()) { + requests.poll(); + LOG.debug("Matched {} to {}", r.request, message); + + try { + NetconfMapping.checkValidReply(r.request, message); + } catch (IllegalStateException e) { + LOG.warn("Invalid request-reply match, reply message contains different message-id", e); + r.future.setException(e); + return; + } + try { - try { - return promise.get(); - } catch (Throwable _e) { - throw Exceptions.sneakyThrow(_e); + NetconfMapping.checkSuccessReply(message); + } catch (NetconfDocumentedException | IllegalStateException e) { + LOG.warn("Error reply from remote device", e); + r.future.setException(e); + return; + } + + r.future.set(NetconfMapping.toRpcResult(message, r.rpc, device.getSchemaContext())); + } else { + LOG.warn("Ignoring unsolicited message", message); + } + } + + synchronized ListenableFuture> sendRequest(final NetconfMessage message, final QName rpc) { + if (session == null) { + LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message); + return Futures.>immediateFuture(new RpcResult() { + @Override + public boolean isSuccessful() { + return false; } - } catch (final Throwable _t) { - if (_t instanceof ExecutionException) { - final ExecutionException e = (ExecutionException) _t; - IllegalStateException _illegalStateException = new IllegalStateException(e); - throw _illegalStateException; + + @Override + public CompositeNode getResult() { + return null; + } + + @Override + public Collection getErrors() { + // FIXME: indicate that the session is down + return Collections.emptySet(); + } + }); + } + + final Request req = new Request(new UncancellableFuture>(true), message, rpc); + requests.add(req); + + session.sendMessage(req.request).addListener(new FutureListener() { + @Override + public void operationComplete(final Future future) throws Exception { + if (!future.isSuccess()) { + // We expect that a session down will occur at this point + LOG.debug("Failed to send request {}", XmlUtil.toString(req.request.getDocument()), future.cause()); + req.future.setException(future.cause()); } else { - throw Exceptions.sneakyThrow(_t); + LOG.trace("Finished sending request {}", req.request); } } - } - String _plus_1 = ("Unsuccessful after " + Integer.valueOf(attempts)); - String _plus_2 = (_plus_1 + " attempts."); - IllegalStateException _illegalStateException_1 = new IllegalStateException(_plus_2); - throw _illegalStateException_1; + }); + + return req.future; } - public synchronized Promise promiseReply() { - this.device.logger.debug("Promising reply."); - this.promiseLock.lock(); - try { - boolean _equals = Objects.equal(this.messagePromise, null); - if (_equals) { - Promise _newPromise = this.eventExecutor. newPromise(); - this.messagePromise = _newPromise; - return this.messagePromise; - } - return this.messagePromise; - } finally { - this.promiseLock.unlock(); + /** + * Process an incoming notification. + * + * @param notification Notification message + */ + private void processNotification(final NetconfMessage notification) { + this.device.logger.debug("Received NETCONF notification.", notification); + CompositeNode domNotification = NetconfMapping.toNotificationNode(notification, device.getSchemaContext()); + if (domNotification == null) { + return; + } + + MountProvisionInstance mountInstance = this.device.getMountInstance(); + if (mountInstance != null) { + mountInstance.publish(domNotification); } } - public boolean isNotification(final NetconfMessage message) { - Document _document = message.getDocument(); - final XmlElement xmle = XmlElement.fromDomDocument(_document); - String _name = xmle.getName(); - return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(_name); + private static boolean isNotification(final NetconfMessage message) { + final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument()); + return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ; } }