BUG-732 Add rollback-on-error error-option to edit-config rpcs from netconf-connector
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDeviceListener.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf;
9
10 import com.google.common.collect.Sets;
11 import io.netty.util.concurrent.Future;
12 import io.netty.util.concurrent.FutureListener;
13
14 import java.util.ArrayDeque;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.Iterator;
18 import java.util.Queue;
19 import java.util.Set;
20
21 import org.opendaylight.controller.netconf.api.NetconfMessage;
22 import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
23 import org.opendaylight.controller.netconf.client.NetconfClientSession;
24 import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
25 import org.opendaylight.controller.netconf.util.xml.XmlElement;
26 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
27 import org.opendaylight.controller.sal.common.util.Rpcs;
28 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
29 import org.opendaylight.yangtools.yang.common.QName;
30 import org.opendaylight.yangtools.yang.common.RpcError;
31 import org.opendaylight.yangtools.yang.common.RpcResult;
32 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
33 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
34 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import com.google.common.base.Preconditions;
39 import com.google.common.util.concurrent.Futures;
40 import com.google.common.util.concurrent.ListenableFuture;
41
42 class NetconfDeviceListener implements NetconfClientSessionListener {
43
44     private static final class Request {
45         final UncancellableFuture<RpcResult<CompositeNode>> future;
46         final NetconfMessage request;
47
48         private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request) {
49             this.future = future;
50             this.request = request;
51         }
52     }
53
54     private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceListener.class);
55     private final Queue<Request> requests = new ArrayDeque<>();
56     private final NetconfDevice device;
57     private NetconfClientSession session;
58
59     public NetconfDeviceListener(final NetconfDevice device) {
60         this.device = Preconditions.checkNotNull(device);
61     }
62
63     @Override
64     public synchronized void onSessionUp(final NetconfClientSession session) {
65         LOG.debug("Session with {} established as address {} session-id {}",
66                 device.getName(), device.getSocketAddress(), session.getSessionId());
67
68         this.session = session;
69
70         final Set<QName> caps = device.getCapabilities(session.getServerCapabilities());
71         LOG.trace("Server {} advertized capabilities {}", device.getName(), caps);
72
73         // Select the appropriate provider
74         final SchemaSourceProvider<String> delegate;
75         if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) {
76             delegate = new NetconfRemoteSchemaSourceProvider(device);
77             // FIXME caps do not contain urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring, since it is filtered out in getCapabilitites
78         } else if(session.getServerCapabilities().contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
79             delegate = new NetconfRemoteSchemaSourceProvider(device);
80         } else {
81             LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName());
82             delegate = SchemaSourceProviders.noopProvider();
83         }
84
85         device.bringUp(delegate, caps, isRollbackSupported(session.getServerCapabilities()));
86
87     }
88
89     private static boolean isRollbackSupported(final Collection<String> serverCapabilities) {
90         // TODO rollback capability cannot be searched for in Set<QName> caps
91         // since this set does not contain module-less capabilities
92         return Sets.newHashSet(serverCapabilities).contains(NetconfMapping.NETCONF_ROLLBACK_ON_ERROR_URI.toString());
93     }
94
95     private synchronized void tearDown(final Exception e) {
96         session = null;
97
98         /*
99          * Walk all requests, check if they have been executing
100          * or cancelled and remove them from the queue.
101          */
102         final Iterator<Request> it = requests.iterator();
103         while (it.hasNext()) {
104             final Request r = it.next();
105             if (r.future.isUncancellable()) {
106                 // FIXME: add a RpcResult instead?
107                 r.future.setException(e);
108                 it.remove();
109             } else if (r.future.isCancelled()) {
110                 // This just does some house-cleaning
111                 it.remove();
112             }
113         }
114
115         device.bringDown();
116     }
117
118     @Override
119     public void onSessionDown(final NetconfClientSession session, final Exception e) {
120         LOG.debug("Session with {} went down", device.getName(), e);
121         tearDown(e);
122     }
123
124     @Override
125     public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
126         LOG.debug("Session with {} terminated {}", session, reason);
127         tearDown(new RuntimeException(reason.getErrorMessage()));
128     }
129
130     @Override
131     public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
132         /*
133          * Dispatch between notifications and messages. Messages need to be processed
134          * with lock held, notifications do not.
135          */
136         if (isNotification(message)) {
137             processNotification(message);
138         } else {
139             processMessage(message);
140         }
141     }
142
143     private synchronized void processMessage(final NetconfMessage message) {
144         final Request r = requests.peek();
145         if (r.future.isUncancellable()) {
146             requests.poll();
147             LOG.debug("Matched {} to {}", r.request, message);
148
149             try {
150                 NetconfMapping.checkValidReply(r.request, message);
151             } catch (IllegalStateException e) {
152                 LOG.warn("Invalid request-reply match, reply message contains different message-id", e);
153                 r.future.setException(e);
154                 return;
155             }
156
157             try {
158                 NetconfMapping.checkSuccessReply(message);
159             } catch (IllegalStateException e) {
160                 LOG.warn("Error reply from remote device", e);
161                 r.future.setException(e);
162                 return;
163             }
164
165             r.future.set(Rpcs.getRpcResult(true, NetconfMapping.toNotificationNode(message, device.getSchemaContext()),
166                     Collections.<RpcError>emptyList()));
167         } else {
168             LOG.warn("Ignoring unsolicited message", message);
169         }
170     }
171
172     synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message) {
173         if (session == null) {
174             LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message);
175             return Futures.<RpcResult<CompositeNode>>immediateFuture(new RpcResult<CompositeNode>() {
176                 @Override
177                 public boolean isSuccessful() {
178                     return false;
179                 }
180
181                 @Override
182                 public CompositeNode getResult() {
183                     return null;
184                 }
185
186                 @Override
187                 public Collection<RpcError> getErrors() {
188                     // FIXME: indicate that the session is down
189                     return Collections.emptySet();
190                 }
191             });
192         }
193
194         final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message);
195         requests.add(req);
196
197         session.sendMessage(req.request).addListener(new FutureListener<Void>() {
198             @Override
199             public void operationComplete(final Future<Void> future) throws Exception {
200                 if (!future.isSuccess()) {
201                     // We expect that a session down will occur at this point
202                     LOG.debug("Failed to send request {}", req.request, future.cause());
203                     req.future.setException(future.cause());
204                 } else {
205                     LOG.trace("Finished sending request {}", req.request);
206                 }
207             }
208         });
209
210         return req.future;
211     }
212
213     /**
214      * Process an incoming notification.
215      *
216      * @param notification Notification message
217      */
218     private void processNotification(final NetconfMessage notification) {
219         this.device.logger.debug("Received NETCONF notification.", notification);
220         CompositeNode domNotification = NetconfMapping.toNotificationNode(notification, device.getSchemaContext());
221         if (domNotification == null) {
222             return;
223         }
224
225         MountProvisionInstance mountInstance =  this.device.getMountInstance();
226         if (mountInstance != null) {
227             mountInstance.publish(domNotification);
228         }
229     }
230
231     private static boolean isNotification(final NetconfMessage message) {
232         final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
233         return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
234     }
235 }