*/
package org.opendaylight.controller.sal.connect.netconf;
+import com.google.common.collect.Sets;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.util.Queue;
import java.util.Set;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
import org.opendaylight.controller.netconf.client.NetconfClientSession;
import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
import org.opendaylight.yangtools.yang.common.QName;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-class NetconfDeviceListener implements NetconfClientSessionListener {
+public class NetconfDeviceListener implements NetconfClientSessionListener {
private static final class Request {
final UncancellableFuture<RpcResult<CompositeNode>> future;
final NetconfMessage request;
+ final QName rpc;
- private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request) {
+ private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request, final QName rpc) {
this.future = future;
this.request = request;
+ this.rpc = rpc;
}
}
LOG.debug("Session with {} established as address {} session-id {}",
device.getName(), device.getSocketAddress(), session.getSessionId());
+ this.session = session;
+
final Set<QName> caps = device.getCapabilities(session.getServerCapabilities());
LOG.trace("Server {} advertized capabilities {}", device.getName(), caps);
final SchemaSourceProvider<String> delegate;
if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) {
delegate = new NetconfRemoteSchemaSourceProvider(device);
- } else if(caps.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
+ // FIXME caps do not contain urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring, since it is filtered out in getCapabilitites
+ } else if(session.getServerCapabilities().contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
delegate = new NetconfRemoteSchemaSourceProvider(device);
} else {
LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName());
- delegate = SchemaSourceProviders.<String>noopProvider();
+ delegate = SchemaSourceProviders.noopProvider();
}
- device.bringUp(delegate, caps);
+ device.bringUp(delegate, caps, isRollbackSupported(session.getServerCapabilities()));
- this.session = session;
+ }
+
+ private static boolean isRollbackSupported(final Collection<String> serverCapabilities) {
+ // TODO rollback capability cannot be searched for in Set<QName> caps
+ // since this set does not contain module-less capabilities
+ return Sets.newHashSet(serverCapabilities).contains(NetconfMapping.NETCONF_ROLLBACK_ON_ERROR_URI.toString());
}
private synchronized void tearDown(final Exception e) {
requests.poll();
LOG.debug("Matched {} to {}", r.request, message);
- // FIXME: this can throw exceptions, which should result
- // in the future failing
- NetconfMapping.checkValidReply(r.request, message);
- r.future.set(Rpcs.getRpcResult(true, NetconfMapping.toNotificationNode(message, device.getSchemaContext()),
- Collections.<RpcError>emptyList()));
+ try {
+ NetconfMapping.checkValidReply(r.request, message);
+ } catch (IllegalStateException e) {
+ LOG.warn("Invalid request-reply match, reply message contains different message-id", e);
+ r.future.setException(e);
+ return;
+ }
+
+ try {
+ NetconfMapping.checkSuccessReply(message);
+ } catch (NetconfDocumentedException | IllegalStateException e) {
+ LOG.warn("Error reply from remote device", e);
+ r.future.setException(e);
+ return;
+ }
+
+ r.future.set(NetconfMapping.toRpcResult(message, r.rpc, device.getSchemaContext()));
} else {
LOG.warn("Ignoring unsolicited message", message);
}
}
- synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message) {
+ synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message, final QName rpc) {
if (session == null) {
LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message);
return Futures.<RpcResult<CompositeNode>>immediateFuture(new RpcResult<CompositeNode>() {
});
}
- final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message);
+ final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message, rpc);
requests.add(req);
session.sendMessage(req.request).addListener(new FutureListener<Void>() {
public void operationComplete(final Future<Void> future) throws Exception {
if (!future.isSuccess()) {
// We expect that a session down will occur at this point
- LOG.debug("Failed to send request {}", req.request, future.cause());
+ LOG.debug("Failed to send request {}", XmlUtil.toString(req.request.getDocument()), future.cause());
req.future.setException(future.cause());
} else {
LOG.trace("Finished sending request {}", req.request);