BUG-732 Fix sal-netconf-connector unable to download schemas
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDeviceListener.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf;
9
10 import com.google.common.collect.Sets;
11 import io.netty.util.concurrent.Future;
12 import io.netty.util.concurrent.FutureListener;
13
14 import java.util.ArrayDeque;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.Iterator;
18 import java.util.Queue;
19 import java.util.Set;
20
21 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
22 import org.opendaylight.controller.netconf.api.NetconfMessage;
23 import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
24 import org.opendaylight.controller.netconf.client.NetconfClientSession;
25 import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
26 import org.opendaylight.controller.netconf.util.xml.XmlElement;
27 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
28 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
29 import org.opendaylight.controller.sal.common.util.Rpcs;
30 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
31 import org.opendaylight.yangtools.yang.common.QName;
32 import org.opendaylight.yangtools.yang.common.RpcError;
33 import org.opendaylight.yangtools.yang.common.RpcResult;
34 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
35 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
36 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import com.google.common.base.Preconditions;
41 import com.google.common.util.concurrent.Futures;
42 import com.google.common.util.concurrent.ListenableFuture;
43
44 class NetconfDeviceListener implements NetconfClientSessionListener {
45
46     private static final class Request {
47         final UncancellableFuture<RpcResult<CompositeNode>> future;
48         final NetconfMessage request;
49         final QName rpc;
50
51         private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request, final QName rpc) {
52             this.future = future;
53             this.request = request;
54             this.rpc = rpc;
55         }
56     }
57
58     private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceListener.class);
59     private final Queue<Request> requests = new ArrayDeque<>();
60     private final NetconfDevice device;
61     private NetconfClientSession session;
62
63     public NetconfDeviceListener(final NetconfDevice device) {
64         this.device = Preconditions.checkNotNull(device);
65     }
66
67     @Override
68     public synchronized void onSessionUp(final NetconfClientSession session) {
69         LOG.debug("Session with {} established as address {} session-id {}",
70                 device.getName(), device.getSocketAddress(), session.getSessionId());
71
72         this.session = session;
73
74         final Set<QName> caps = device.getCapabilities(session.getServerCapabilities());
75         LOG.trace("Server {} advertized capabilities {}", device.getName(), caps);
76
77         // Select the appropriate provider
78         final SchemaSourceProvider<String> delegate;
79         if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) {
80             delegate = new NetconfRemoteSchemaSourceProvider(device);
81             // FIXME caps do not contain urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring, since it is filtered out in getCapabilitites
82         } else if(session.getServerCapabilities().contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
83             delegate = new NetconfRemoteSchemaSourceProvider(device);
84         } else {
85             LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName());
86             delegate = SchemaSourceProviders.noopProvider();
87         }
88
89         device.bringUp(delegate, caps, isRollbackSupported(session.getServerCapabilities()));
90
91     }
92
93     private static boolean isRollbackSupported(final Collection<String> serverCapabilities) {
94         // TODO rollback capability cannot be searched for in Set<QName> caps
95         // since this set does not contain module-less capabilities
96         return Sets.newHashSet(serverCapabilities).contains(NetconfMapping.NETCONF_ROLLBACK_ON_ERROR_URI.toString());
97     }
98
99     private synchronized void tearDown(final Exception e) {
100         session = null;
101
102         /*
103          * Walk all requests, check if they have been executing
104          * or cancelled and remove them from the queue.
105          */
106         final Iterator<Request> it = requests.iterator();
107         while (it.hasNext()) {
108             final Request r = it.next();
109             if (r.future.isUncancellable()) {
110                 // FIXME: add a RpcResult instead?
111                 r.future.setException(e);
112                 it.remove();
113             } else if (r.future.isCancelled()) {
114                 // This just does some house-cleaning
115                 it.remove();
116             }
117         }
118
119         device.bringDown();
120     }
121
122     @Override
123     public void onSessionDown(final NetconfClientSession session, final Exception e) {
124         LOG.debug("Session with {} went down", device.getName(), e);
125         tearDown(e);
126     }
127
128     @Override
129     public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
130         LOG.debug("Session with {} terminated {}", session, reason);
131         tearDown(new RuntimeException(reason.getErrorMessage()));
132     }
133
134     @Override
135     public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
136         /*
137          * Dispatch between notifications and messages. Messages need to be processed
138          * with lock held, notifications do not.
139          */
140         if (isNotification(message)) {
141             processNotification(message);
142         } else {
143             processMessage(message);
144         }
145     }
146
147     private synchronized void processMessage(final NetconfMessage message) {
148         final Request r = requests.peek();
149         if (r.future.isUncancellable()) {
150             requests.poll();
151             LOG.debug("Matched {} to {}", r.request, message);
152
153             try {
154                 NetconfMapping.checkValidReply(r.request, message);
155             } catch (IllegalStateException e) {
156                 LOG.warn("Invalid request-reply match, reply message contains different message-id", e);
157                 r.future.setException(e);
158                 return;
159             }
160
161             try {
162                 NetconfMapping.checkSuccessReply(message);
163             } catch (NetconfDocumentedException | IllegalStateException e) {
164                 LOG.warn("Error reply from remote device", e);
165                 r.future.setException(e);
166                 return;
167             }
168
169             r.future.set(NetconfMapping.toRpcResult(message, r.rpc, device.getSchemaContext()));
170         } else {
171             LOG.warn("Ignoring unsolicited message", message);
172         }
173     }
174
175     synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message, final QName rpc) {
176         if (session == null) {
177             LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message);
178             return Futures.<RpcResult<CompositeNode>>immediateFuture(new RpcResult<CompositeNode>() {
179                 @Override
180                 public boolean isSuccessful() {
181                     return false;
182                 }
183
184                 @Override
185                 public CompositeNode getResult() {
186                     return null;
187                 }
188
189                 @Override
190                 public Collection<RpcError> getErrors() {
191                     // FIXME: indicate that the session is down
192                     return Collections.emptySet();
193                 }
194             });
195         }
196
197         final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message, rpc);
198         requests.add(req);
199
200         session.sendMessage(req.request).addListener(new FutureListener<Void>() {
201             @Override
202             public void operationComplete(final Future<Void> future) throws Exception {
203                 if (!future.isSuccess()) {
204                     // We expect that a session down will occur at this point
205                     LOG.debug("Failed to send request {}", XmlUtil.toString(req.request.getDocument()), future.cause());
206                     req.future.setException(future.cause());
207                 } else {
208                     LOG.trace("Finished sending request {}", req.request);
209                 }
210             }
211         });
212
213         return req.future;
214     }
215
216     /**
217      * Process an incoming notification.
218      *
219      * @param notification Notification message
220      */
221     private void processNotification(final NetconfMessage notification) {
222         this.device.logger.debug("Received NETCONF notification.", notification);
223         CompositeNode domNotification = NetconfMapping.toNotificationNode(notification, device.getSchemaContext());
224         if (domNotification == null) {
225             return;
226         }
227
228         MountProvisionInstance mountInstance =  this.device.getMountInstance();
229         if (mountInstance != null) {
230             mountInstance.publish(domNotification);
231         }
232     }
233
234     private static boolean isNotification(final NetconfMessage message) {
235         final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
236         return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
237     }
238 }