Merge "Bug 809: Enhancements to the toaster example"
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDeviceListener.java
index 8623d90fe5465600fff4cf31253a52b4da57a5dc..68667f0143489cf4f8f633b641224e68d5ca73e8 100644 (file)
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.sal.connect.netconf;
 
-import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
 
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Promise;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Set;
 
-import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.eclipse.xtext.xbase.lib.Exceptions;
-import org.eclipse.xtext.xbase.lib.Functions.Function0;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
 import org.opendaylight.controller.netconf.client.NetconfClientSession;
 import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
-import org.opendaylight.controller.sal.connect.netconf.NetconfDevice;
-import org.opendaylight.controller.sal.connect.netconf.NetconfMapping;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.w3c.dom.Document;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
-@SuppressWarnings("all")
-class NetconfDeviceListener extends NetconfClientSessionListener {
+public class NetconfDeviceListener implements NetconfClientSessionListener {
+    private static final class Request {
+        final UncancellableFuture<RpcResult<CompositeNode>> future;
+        final NetconfMessage request;
+        final QName rpc;
+
+        private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request, final QName rpc) {
+            this.future = future;
+            this.request = request;
+            this.rpc = rpc;
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceListener.class);
+    private final Queue<Request> requests = new ArrayDeque<>();
     private final NetconfDevice device;
-    private final EventExecutor eventExecutor;
+    private NetconfClientSession session;
 
-    public NetconfDeviceListener(final NetconfDevice device, final EventExecutor eventExecutor) {
-        this.device = device;
-        this.eventExecutor = eventExecutor;
+    public NetconfDeviceListener(final NetconfDevice device) {
+        this.device = Preconditions.checkNotNull(device);
     }
 
-    private Promise<NetconfMessage> messagePromise;
-    private ConcurrentMap<String, Promise<NetconfMessage>> promisedMessages;
+    @Override
+    public synchronized void onSessionUp(final NetconfClientSession session) {
+        LOG.debug("Session with {} established as address {} session-id {}",
+                device.getName(), device.getSocketAddress(), session.getSessionId());
 
-    private final ReentrantLock promiseLock = new ReentrantLock();
+        this.session = session;
 
-    public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
-        if (isNotification(message)) {
-            this.onNotification(session, message);
+        final Set<QName> caps = device.getCapabilities(session.getServerCapabilities());
+        LOG.trace("Server {} advertized capabilities {}", device.getName(), caps);
+
+        // Select the appropriate provider
+        final SchemaSourceProvider<String> delegate;
+        if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) {
+            delegate = new NetconfRemoteSchemaSourceProvider(device);
+            // FIXME caps do not contain urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring, since it is filtered out in getCapabilitites
+        } else if(session.getServerCapabilities().contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
+            delegate = new NetconfRemoteSchemaSourceProvider(device);
         } else {
-            try {
-                this.promiseLock.lock();
-                boolean _notEquals = (!Objects.equal(this.messagePromise, null));
-                if (_notEquals) {
-                    this.device.logger.debug("Setting promised reply {} with message {}", this.messagePromise, message);
-                    this.messagePromise.setSuccess(message);
-                    this.messagePromise = null;
-                }
-            } finally {
-                this.promiseLock.unlock();
-            }
+            LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName());
+            delegate = SchemaSourceProviders.noopProvider();
         }
+
+        device.bringUp(delegate, caps, isRollbackSupported(session.getServerCapabilities()));
+
     }
 
-    /**
-     * Method intended to customize notification processing.
-     * 
-     * @param session
-     *            {@see
-     *            NetconfClientSessionListener#onMessage(NetconfClientSession,
-     *            NetconfMessage)}
-     * @param message
-     *            {@see
-     *            NetconfClientSessionListener#onMessage(NetconfClientSession,
-     *            NetconfMessage)}
-     */
-    public void onNotification(final NetconfClientSession session, final NetconfMessage message) {
-        this.device.logger.debug("Received NETCONF notification.", message);
-        CompositeNode _notificationBody = null;
-        CompositeNode _compositeNode = null;
-        if (message != null) {
-            _compositeNode = NetconfMapping.toCompositeNode(message,device.getSchemaContext());
-        }
-        if (_compositeNode != null) {
-            _notificationBody = NetconfDeviceListener.getNotificationBody(_compositeNode);
-        }
-        final CompositeNode domNotification = _notificationBody;
-        boolean _notEquals = (!Objects.equal(domNotification, null));
-        if (_notEquals) {
-            MountProvisionInstance _mountInstance = null;
-            if (this.device != null) {
-                _mountInstance = this.device.getMountInstance();
-            }
-            if (_mountInstance != null) {
-                _mountInstance.publish(domNotification);
+    private static boolean isRollbackSupported(final Collection<String> serverCapabilities) {
+        // TODO rollback capability cannot be searched for in Set<QName> caps
+        // since this set does not contain module-less capabilities
+        return Sets.newHashSet(serverCapabilities).contains(NetconfMapping.NETCONF_ROLLBACK_ON_ERROR_URI.toString());
+    }
+
+    private synchronized void tearDown(final Exception e) {
+        session = null;
+
+        /*
+         * Walk all requests, check if they have been executing
+         * or cancelled and remove them from the queue.
+         */
+        final Iterator<Request> it = requests.iterator();
+        while (it.hasNext()) {
+            final Request r = it.next();
+            if (r.future.isUncancellable()) {
+                // FIXME: add a RpcResult instead?
+                r.future.setException(e);
+                it.remove();
+            } else if (r.future.isCancelled()) {
+                // This just does some house-cleaning
+                it.remove();
             }
         }
+
+        device.bringDown();
     }
 
-    private static CompositeNode getNotificationBody(final CompositeNode node) {
-        List<Node<? extends Object>> _children = node.getChildren();
-        for (final Node<? extends Object> child : _children) {
-            if ((child instanceof CompositeNode)) {
-                return ((CompositeNode) child);
-            }
+    @Override
+    public void onSessionDown(final NetconfClientSession session, final Exception e) {
+        LOG.debug("Session with {} went down", device.getName(), e);
+        tearDown(e);
+    }
+
+    @Override
+    public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
+        LOG.debug("Session with {} terminated {}", session, reason);
+        tearDown(new RuntimeException(reason.getErrorMessage()));
+    }
+
+    @Override
+    public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
+        /*
+         * Dispatch between notifications and messages. Messages need to be processed
+         * with lock held, notifications do not.
+         */
+        if (isNotification(message)) {
+            processNotification(message);
+        } else {
+            processMessage(message);
         }
-        return null;
     }
 
-    public NetconfMessage getLastMessage(final int attempts, final int attemptMsDelay) throws InterruptedException {
-        final Promise<NetconfMessage> promise = this.promiseReply();
-        this.device.logger.debug("Waiting for reply {}", promise);
-        int _plus = (attempts * attemptMsDelay);
-        final boolean messageAvailable = promise.await(_plus);
-        if (messageAvailable) {
+    private synchronized void processMessage(final NetconfMessage message) {
+        final Request r = requests.peek();
+        if (r.future.isUncancellable()) {
+            requests.poll();
+            LOG.debug("Matched {} to {}", r.request, message);
+
+            try {
+                NetconfMapping.checkValidReply(r.request, message);
+            } catch (IllegalStateException e) {
+                LOG.warn("Invalid request-reply match, reply message contains different message-id", e);
+                r.future.setException(e);
+                return;
+            }
+
             try {
-                try {
-                    return promise.get();
-                } catch (Throwable _e) {
-                    throw Exceptions.sneakyThrow(_e);
+                NetconfMapping.checkSuccessReply(message);
+            } catch (NetconfDocumentedException | IllegalStateException e) {
+                LOG.warn("Error reply from remote device", e);
+                r.future.setException(e);
+                return;
+            }
+
+            r.future.set(NetconfMapping.toRpcResult(message, r.rpc, device.getSchemaContext()));
+        } else {
+            LOG.warn("Ignoring unsolicited message", message);
+        }
+    }
+
+    synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message, final QName rpc) {
+        if (session == null) {
+            LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message);
+            return Futures.<RpcResult<CompositeNode>>immediateFuture(new RpcResult<CompositeNode>() {
+                @Override
+                public boolean isSuccessful() {
+                    return false;
                 }
-            } catch (final Throwable _t) {
-                if (_t instanceof ExecutionException) {
-                    final ExecutionException e = (ExecutionException) _t;
-                    IllegalStateException _illegalStateException = new IllegalStateException(e);
-                    throw _illegalStateException;
+
+                @Override
+                public CompositeNode getResult() {
+                    return null;
+                }
+
+                @Override
+                public Collection<RpcError> getErrors() {
+                    // FIXME: indicate that the session is down
+                    return Collections.emptySet();
+                }
+            });
+        }
+
+        final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message, rpc);
+        requests.add(req);
+
+        session.sendMessage(req.request).addListener(new FutureListener<Void>() {
+            @Override
+            public void operationComplete(final Future<Void> future) throws Exception {
+                if (!future.isSuccess()) {
+                    // We expect that a session down will occur at this point
+                    LOG.debug("Failed to send request {}", XmlUtil.toString(req.request.getDocument()), future.cause());
+                    req.future.setException(future.cause());
                 } else {
-                    throw Exceptions.sneakyThrow(_t);
+                    LOG.trace("Finished sending request {}", req.request);
                 }
             }
-        }
-        String _plus_1 = ("Unsuccessful after " + Integer.valueOf(attempts));
-        String _plus_2 = (_plus_1 + " attempts.");
-        IllegalStateException _illegalStateException_1 = new IllegalStateException(_plus_2);
-        throw _illegalStateException_1;
+        });
+
+        return req.future;
     }
 
-    public synchronized Promise<NetconfMessage> promiseReply() {
-        this.device.logger.debug("Promising reply.");
-        this.promiseLock.lock();
-        try {
-            boolean _equals = Objects.equal(this.messagePromise, null);
-            if (_equals) {
-                Promise<NetconfMessage> _newPromise = this.eventExecutor.<NetconfMessage> newPromise();
-                this.messagePromise = _newPromise;
-                return this.messagePromise;
-            }
-            return this.messagePromise;
-        } finally {
-            this.promiseLock.unlock();
+    /**
+     * Process an incoming notification.
+     *
+     * @param notification Notification message
+     */
+    private void processNotification(final NetconfMessage notification) {
+        this.device.logger.debug("Received NETCONF notification.", notification);
+        CompositeNode domNotification = NetconfMapping.toNotificationNode(notification, device.getSchemaContext());
+        if (domNotification == null) {
+            return;
+        }
+
+        MountProvisionInstance mountInstance =  this.device.getMountInstance();
+        if (mountInstance != null) {
+            mountInstance.publish(domNotification);
         }
     }
 
-    public boolean isNotification(final NetconfMessage message) {
-        Document _document = message.getDocument();
-        final XmlElement xmle = XmlElement.fromDomDocument(_document);
-        String _name = xmle.getName();
-        return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(_name);
+    private static boolean isNotification(final NetconfMessage message) {
+        final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
+        return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
     }
 }