Merge "Bug 509: Improve logging in InMemoryDataStore."
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDeviceListener.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf;
9
10 import io.netty.util.concurrent.Future;
11 import io.netty.util.concurrent.FutureListener;
12
13 import java.util.ArrayDeque;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.Queue;
18 import java.util.Set;
19
20 import org.opendaylight.controller.netconf.api.NetconfMessage;
21 import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
22 import org.opendaylight.controller.netconf.client.NetconfClientSession;
23 import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
24 import org.opendaylight.controller.netconf.util.xml.XmlElement;
25 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
26 import org.opendaylight.controller.sal.common.util.Rpcs;
27 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
28 import org.opendaylight.yangtools.yang.common.QName;
29 import org.opendaylight.yangtools.yang.common.RpcError;
30 import org.opendaylight.yangtools.yang.common.RpcResult;
31 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
32 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
33 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.google.common.base.Preconditions;
38 import com.google.common.util.concurrent.Futures;
39 import com.google.common.util.concurrent.ListenableFuture;
40
41 class NetconfDeviceListener implements NetconfClientSessionListener {
42     private static final class Request {
43         final UncancellableFuture<RpcResult<CompositeNode>> future;
44         final NetconfMessage request;
45
46         private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request) {
47             this.future = future;
48             this.request = request;
49         }
50     }
51
52     private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceListener.class);
53     private final Queue<Request> requests = new ArrayDeque<>();
54     private final NetconfDevice device;
55     private NetconfClientSession session;
56
57     public NetconfDeviceListener(final NetconfDevice device) {
58         this.device = Preconditions.checkNotNull(device);
59     }
60
61     @Override
62     public synchronized void onSessionUp(final NetconfClientSession session) {
63         LOG.debug("Session with {} established as address {} session-id {}",
64                 device.getName(), device.getSocketAddress(), session.getSessionId());
65
66         final Set<QName> caps = device.getCapabilities(session.getServerCapabilities());
67         LOG.trace("Server {} advertized capabilities {}", device.getName(), caps);
68
69         // Select the appropriate provider
70         final SchemaSourceProvider<String> delegate;
71         if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) {
72             delegate = new NetconfRemoteSchemaSourceProvider(device);
73         } else if(caps.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
74             delegate = new NetconfRemoteSchemaSourceProvider(device);
75         } else {
76             LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName());
77             delegate = SchemaSourceProviders.<String>noopProvider();
78         }
79
80         device.bringUp(delegate, caps);
81
82         this.session = session;
83     }
84
85     private synchronized void tearDown(final Exception e) {
86         session = null;
87
88         /*
89          * Walk all requests, check if they have been executing
90          * or cancelled and remove them from the queue.
91          */
92         final Iterator<Request> it = requests.iterator();
93         while (it.hasNext()) {
94             final Request r = it.next();
95             if (r.future.isUncancellable()) {
96                 // FIXME: add a RpcResult instead?
97                 r.future.setException(e);
98                 it.remove();
99             } else if (r.future.isCancelled()) {
100                 // This just does some house-cleaning
101                 it.remove();
102             }
103         }
104
105         device.bringDown();
106     }
107
108     @Override
109     public void onSessionDown(final NetconfClientSession session, final Exception e) {
110         LOG.debug("Session with {} went down", device.getName(), e);
111         tearDown(e);
112     }
113
114     @Override
115     public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
116         LOG.debug("Session with {} terminated {}", session, reason);
117         tearDown(new RuntimeException(reason.getErrorMessage()));
118     }
119
120     @Override
121     public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
122         /*
123          * Dispatch between notifications and messages. Messages need to be processed
124          * with lock held, notifications do not.
125          */
126         if (isNotification(message)) {
127             processNotification(message);
128         } else {
129             processMessage(message);
130         }
131     }
132
133     private synchronized void processMessage(final NetconfMessage message) {
134         final Request r = requests.peek();
135         if (r.future.isUncancellable()) {
136             requests.poll();
137             LOG.debug("Matched {} to {}", r.request, message);
138
139             // FIXME: this can throw exceptions, which should result
140             // in the future failing
141             NetconfMapping.checkValidReply(r.request, message);
142             r.future.set(Rpcs.getRpcResult(true, NetconfMapping.toNotificationNode(message, device.getSchemaContext()),
143                     Collections.<RpcError>emptyList()));
144         } else {
145             LOG.warn("Ignoring unsolicited message", message);
146         }
147     }
148
149     synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message) {
150         if (session == null) {
151             LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message);
152             return Futures.<RpcResult<CompositeNode>>immediateFuture(new RpcResult<CompositeNode>() {
153                 @Override
154                 public boolean isSuccessful() {
155                     return false;
156                 }
157
158                 @Override
159                 public CompositeNode getResult() {
160                     return null;
161                 }
162
163                 @Override
164                 public Collection<RpcError> getErrors() {
165                     // FIXME: indicate that the session is down
166                     return Collections.emptySet();
167                 }
168             });
169         }
170
171         final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message);
172         requests.add(req);
173
174         session.sendMessage(req.request).addListener(new FutureListener<Void>() {
175             @Override
176             public void operationComplete(final Future<Void> future) throws Exception {
177                 if (!future.isSuccess()) {
178                     // We expect that a session down will occur at this point
179                     LOG.debug("Failed to send request {}", req.request, future.cause());
180                     req.future.setException(future.cause());
181                 } else {
182                     LOG.trace("Finished sending request {}", req.request);
183                 }
184             }
185         });
186
187         return req.future;
188     }
189
190     /**
191      * Process an incoming notification.
192      *
193      * @param notification Notification message
194      */
195     private void processNotification(final NetconfMessage notification) {
196         this.device.logger.debug("Received NETCONF notification.", notification);
197         CompositeNode domNotification = NetconfMapping.toNotificationNode(notification, device.getSchemaContext());
198         if (domNotification == null) {
199             return;
200         }
201
202         MountProvisionInstance mountInstance =  this.device.getMountInstance();
203         if (mountInstance != null) {
204             mountInstance.publish(domNotification);
205         }
206     }
207
208     private static boolean isNotification(final NetconfMessage message) {
209         final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
210         return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
211     }
212 }