2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connect.netconf;
10 import io.netty.util.concurrent.Future;
11 import io.netty.util.concurrent.FutureListener;
13 import java.util.ArrayDeque;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.Queue;
20 import org.opendaylight.controller.netconf.api.NetconfMessage;
21 import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
22 import org.opendaylight.controller.netconf.client.NetconfClientSession;
23 import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
24 import org.opendaylight.controller.netconf.util.xml.XmlElement;
25 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
26 import org.opendaylight.controller.sal.common.util.Rpcs;
27 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
28 import org.opendaylight.yangtools.yang.common.QName;
29 import org.opendaylight.yangtools.yang.common.RpcError;
30 import org.opendaylight.yangtools.yang.common.RpcResult;
31 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
32 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
33 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import com.google.common.base.Preconditions;
38 import com.google.common.util.concurrent.Futures;
39 import com.google.common.util.concurrent.ListenableFuture;
41 class NetconfDeviceListener implements NetconfClientSessionListener {
42 private static final class Request {
43 final UncancellableFuture<RpcResult<CompositeNode>> future;
44 final NetconfMessage request;
46 private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request) {
48 this.request = request;
52 private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceListener.class);
53 private final Queue<Request> requests = new ArrayDeque<>();
54 private final NetconfDevice device;
55 private NetconfClientSession session;
57 public NetconfDeviceListener(final NetconfDevice device) {
58 this.device = Preconditions.checkNotNull(device);
62 public synchronized void onSessionUp(final NetconfClientSession session) {
63 LOG.debug("Session with {} established as address {} session-id {}",
64 device.getName(), device.getSocketAddress(), session.getSessionId());
66 final Set<QName> caps = device.getCapabilities(session.getServerCapabilities());
67 LOG.trace("Server {} advertized capabilities {}", device.getName(), caps);
69 // Select the appropriate provider
70 final SchemaSourceProvider<String> delegate;
71 if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) {
72 delegate = new NetconfRemoteSchemaSourceProvider(device);
73 } else if(caps.contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
74 delegate = new NetconfRemoteSchemaSourceProvider(device);
76 LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName());
77 delegate = SchemaSourceProviders.<String>noopProvider();
80 device.bringUp(delegate, caps);
82 this.session = session;
85 private synchronized void tearDown(final Exception e) {
89 * Walk all requests, check if they have been executing
90 * or cancelled and remove them from the queue.
92 final Iterator<Request> it = requests.iterator();
93 while (it.hasNext()) {
94 final Request r = it.next();
95 if (r.future.isUncancellable()) {
96 // FIXME: add a RpcResult instead?
97 r.future.setException(e);
99 } else if (r.future.isCancelled()) {
100 // This just does some house-cleaning
109 public void onSessionDown(final NetconfClientSession session, final Exception e) {
110 LOG.debug("Session with {} went down", device.getName(), e);
115 public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
116 LOG.debug("Session with {} terminated {}", session, reason);
117 tearDown(new RuntimeException(reason.getErrorMessage()));
121 public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
123 * Dispatch between notifications and messages. Messages need to be processed
124 * with lock held, notifications do not.
126 if (isNotification(message)) {
127 processNotification(message);
129 processMessage(message);
133 private synchronized void processMessage(final NetconfMessage message) {
134 final Request r = requests.peek();
135 if (r.future.isUncancellable()) {
137 LOG.debug("Matched {} to {}", r.request, message);
139 // FIXME: this can throw exceptions, which should result
140 // in the future failing
141 NetconfMapping.checkValidReply(r.request, message);
142 r.future.set(Rpcs.getRpcResult(true, NetconfMapping.toNotificationNode(message, device.getSchemaContext()),
143 Collections.<RpcError>emptyList()));
145 LOG.warn("Ignoring unsolicited message", message);
149 synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message) {
150 if (session == null) {
151 LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message);
152 return Futures.<RpcResult<CompositeNode>>immediateFuture(new RpcResult<CompositeNode>() {
154 public boolean isSuccessful() {
159 public CompositeNode getResult() {
164 public Collection<RpcError> getErrors() {
165 // FIXME: indicate that the session is down
166 return Collections.emptySet();
171 final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message);
174 session.sendMessage(req.request).addListener(new FutureListener<Void>() {
176 public void operationComplete(final Future<Void> future) throws Exception {
177 if (!future.isSuccess()) {
178 // We expect that a session down will occur at this point
179 LOG.debug("Failed to send request {}", req.request, future.cause());
180 req.future.setException(future.cause());
182 LOG.trace("Finished sending request {}", req.request);
191 * Process an incoming notification.
193 * @param notification Notification message
195 private void processNotification(final NetconfMessage notification) {
196 this.device.logger.debug("Received NETCONF notification.", notification);
197 CompositeNode domNotification = NetconfMapping.toNotificationNode(notification, device.getSchemaContext());
198 if (domNotification == null) {
202 MountProvisionInstance mountInstance = this.device.getMountInstance();
203 if (mountInstance != null) {
204 mountInstance.publish(domNotification);
208 private static boolean isNotification(final NetconfMessage message) {
209 final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
210 return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;