Merge "Fixed typo in SnapshotBackedWriteTransaction class"
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDeviceListener.java
index d5e1d35d7d5721e06b9b515822813b54feac779f..68667f0143489cf4f8f633b641224e68d5ca73e8 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.sal.connect.netconf;
 
+import com.google.common.collect.Sets;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
 
@@ -17,12 +18,14 @@ import java.util.Iterator;
 import java.util.Queue;
 import java.util.Set;
 
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
 import org.opendaylight.controller.netconf.client.NetconfClientSession;
 import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -38,14 +41,16 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
-class NetconfDeviceListener implements NetconfClientSessionListener {
+public class NetconfDeviceListener implements NetconfClientSessionListener {
     private static final class Request {
         final UncancellableFuture<RpcResult<CompositeNode>> future;
         final NetconfMessage request;
+        final QName rpc;
 
-        private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request) {
+        private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request, final QName rpc) {
             this.future = future;
             this.request = request;
+            this.rpc = rpc;
         }
     }
 
@@ -63,6 +68,8 @@ class NetconfDeviceListener implements NetconfClientSessionListener {
         LOG.debug("Session with {} established as address {} session-id {}",
                 device.getName(), device.getSocketAddress(), session.getSessionId());
 
+        this.session = session;
+
         final Set<QName> caps = device.getCapabilities(session.getServerCapabilities());
         LOG.trace("Server {} advertized capabilities {}", device.getName(), caps);
 
@@ -70,16 +77,22 @@ class NetconfDeviceListener implements NetconfClientSessionListener {
         final SchemaSourceProvider<String> delegate;
         if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) {
             delegate = new NetconfRemoteSchemaSourceProvider(device);
-        } else if(caps.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
+            // FIXME caps do not contain urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring, since it is filtered out in getCapabilitites
+        } else if(session.getServerCapabilities().contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
             delegate = new NetconfRemoteSchemaSourceProvider(device);
         } else {
             LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName());
-            delegate = SchemaSourceProviders.<String>noopProvider();
+            delegate = SchemaSourceProviders.noopProvider();
         }
 
-        device.bringUp(delegate, caps);
+        device.bringUp(delegate, caps, isRollbackSupported(session.getServerCapabilities()));
 
-        this.session = session;
+    }
+
+    private static boolean isRollbackSupported(final Collection<String> serverCapabilities) {
+        // TODO rollback capability cannot be searched for in Set<QName> caps
+        // since this set does not contain module-less capabilities
+        return Sets.newHashSet(serverCapabilities).contains(NetconfMapping.NETCONF_ROLLBACK_ON_ERROR_URI.toString());
     }
 
     private synchronized void tearDown(final Exception e) {
@@ -136,17 +149,29 @@ class NetconfDeviceListener implements NetconfClientSessionListener {
             requests.poll();
             LOG.debug("Matched {} to {}", r.request, message);
 
-            // FIXME: this can throw exceptions, which should result
-            // in the future failing
-            NetconfMapping.checkValidReply(r.request, message);
-            r.future.set(Rpcs.getRpcResult(true, NetconfMapping.toNotificationNode(message, device.getSchemaContext()),
-                    Collections.<RpcError>emptyList()));
+            try {
+                NetconfMapping.checkValidReply(r.request, message);
+            } catch (IllegalStateException e) {
+                LOG.warn("Invalid request-reply match, reply message contains different message-id", e);
+                r.future.setException(e);
+                return;
+            }
+
+            try {
+                NetconfMapping.checkSuccessReply(message);
+            } catch (NetconfDocumentedException | IllegalStateException e) {
+                LOG.warn("Error reply from remote device", e);
+                r.future.setException(e);
+                return;
+            }
+
+            r.future.set(NetconfMapping.toRpcResult(message, r.rpc, device.getSchemaContext()));
         } else {
             LOG.warn("Ignoring unsolicited message", message);
         }
     }
 
-    synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message) {
+    synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message, final QName rpc) {
         if (session == null) {
             LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message);
             return Futures.<RpcResult<CompositeNode>>immediateFuture(new RpcResult<CompositeNode>() {
@@ -168,7 +193,7 @@ class NetconfDeviceListener implements NetconfClientSessionListener {
             });
         }
 
-        final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message);
+        final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message, rpc);
         requests.add(req);
 
         session.sendMessage(req.request).addListener(new FutureListener<Void>() {
@@ -176,7 +201,7 @@ class NetconfDeviceListener implements NetconfClientSessionListener {
             public void operationComplete(final Future<Void> future) throws Exception {
                 if (!future.isSuccess()) {
                     // We expect that a session down will occur at this point
-                    LOG.debug("Failed to send request {}", req.request, future.cause());
+                    LOG.debug("Failed to send request {}", XmlUtil.toString(req.request.getDocument()), future.cause());
                     req.future.setException(future.cause());
                 } else {
                     LOG.trace("Finished sending request {}", req.request);