*/
package org.opendaylight.controller.sal.connect.netconf;
-import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Promise;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Set;
-import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.eclipse.xtext.xbase.lib.Exceptions;
-import org.eclipse.xtext.xbase.lib.Functions.Function0;
import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
import org.opendaylight.controller.netconf.client.NetconfClientSession;
import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
-import org.opendaylight.controller.sal.connect.netconf.NetconfDevice;
-import org.opendaylight.controller.sal.connect.netconf.NetconfMapping;
+import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.w3c.dom.Document;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
+import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+class NetconfDeviceListener implements NetconfClientSessionListener {
+
+ private static final class Request {
+ final UncancellableFuture<RpcResult<CompositeNode>> future;
+ final NetconfMessage request;
+
+ private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request) {
+ this.future = future;
+ this.request = request;
+ }
+ }
-@SuppressWarnings("all")
-class NetconfDeviceListener extends NetconfClientSessionListener {
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceListener.class);
+ private final Queue<Request> requests = new ArrayDeque<>();
private final NetconfDevice device;
- private final EventExecutor eventExecutor;
+ private NetconfClientSession session;
- public NetconfDeviceListener(final NetconfDevice device, final EventExecutor eventExecutor) {
- this.device = device;
- this.eventExecutor = eventExecutor;
+ public NetconfDeviceListener(final NetconfDevice device) {
+ this.device = Preconditions.checkNotNull(device);
}
- private Promise<NetconfMessage> messagePromise;
- private ConcurrentMap<String, Promise<NetconfMessage>> promisedMessages;
+ @Override
+ public synchronized void onSessionUp(final NetconfClientSession session) {
+ LOG.debug("Session with {} established as address {} session-id {}",
+ device.getName(), device.getSocketAddress(), session.getSessionId());
- private final ReentrantLock promiseLock = new ReentrantLock();
+ this.session = session;
- public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
- if (isNotification(message)) {
- this.onNotification(session, message);
+ final Set<QName> caps = device.getCapabilities(session.getServerCapabilities());
+ LOG.trace("Server {} advertized capabilities {}", device.getName(), caps);
+
+ // Select the appropriate provider
+ final SchemaSourceProvider<String> delegate;
+ if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) {
+ delegate = new NetconfRemoteSchemaSourceProvider(device);
+ // FIXME caps do not contain urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring, since it is filtered out in getCapabilitites
+ } else if(session.getServerCapabilities().contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
+ delegate = new NetconfRemoteSchemaSourceProvider(device);
} else {
- try {
- this.promiseLock.lock();
- boolean _notEquals = (!Objects.equal(this.messagePromise, null));
- if (_notEquals) {
- this.device.logger.debug("Setting promised reply {} with message {}", this.messagePromise, message);
- this.messagePromise.setSuccess(message);
- this.messagePromise = null;
- }
- } finally {
- this.promiseLock.unlock();
- }
+ LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName());
+ delegate = SchemaSourceProviders.noopProvider();
}
+
+ device.bringUp(delegate, caps, isRollbackSupported(session.getServerCapabilities()));
+
}
- /**
- * Method intended to customize notification processing.
- *
- * @param session
- * {@see
- * NetconfClientSessionListener#onMessage(NetconfClientSession,
- * NetconfMessage)}
- * @param message
- * {@see
- * NetconfClientSessionListener#onMessage(NetconfClientSession,
- * NetconfMessage)}
- */
- public void onNotification(final NetconfClientSession session, final NetconfMessage message) {
- this.device.logger.debug("Received NETCONF notification.", message);
- CompositeNode domNotification = null;
- if (message != null) {
- domNotification = NetconfMapping.toNotificationNode(message, device.getSchemaContext());
- }
- if (domNotification != null) {
- MountProvisionInstance _mountInstance = null;
- if (this.device != null) {
- _mountInstance = this.device.getMountInstance();
- }
- if (_mountInstance != null) {
- _mountInstance.publish(domNotification);
+ private static boolean isRollbackSupported(final Collection<String> serverCapabilities) {
+ // TODO rollback capability cannot be searched for in Set<QName> caps
+ // since this set does not contain module-less capabilities
+ return Sets.newHashSet(serverCapabilities).contains(NetconfMapping.NETCONF_ROLLBACK_ON_ERROR_URI.toString());
+ }
+
+ private synchronized void tearDown(final Exception e) {
+ session = null;
+
+ /*
+ * Walk all requests, check if they have been executing
+ * or cancelled and remove them from the queue.
+ */
+ final Iterator<Request> it = requests.iterator();
+ while (it.hasNext()) {
+ final Request r = it.next();
+ if (r.future.isUncancellable()) {
+ // FIXME: add a RpcResult instead?
+ r.future.setException(e);
+ it.remove();
+ } else if (r.future.isCancelled()) {
+ // This just does some house-cleaning
+ it.remove();
}
}
+
+ device.bringDown();
}
- private static CompositeNode getNotificationBody(final CompositeNode node) {
- List<Node<? extends Object>> _children = node.getChildren();
- for (final Node<? extends Object> child : _children) {
- if ((child instanceof CompositeNode)) {
- return ((CompositeNode) child);
- }
+ @Override
+ public void onSessionDown(final NetconfClientSession session, final Exception e) {
+ LOG.debug("Session with {} went down", device.getName(), e);
+ tearDown(e);
+ }
+
+ @Override
+ public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
+ LOG.debug("Session with {} terminated {}", session, reason);
+ tearDown(new RuntimeException(reason.getErrorMessage()));
+ }
+
+ @Override
+ public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
+ /*
+ * Dispatch between notifications and messages. Messages need to be processed
+ * with lock held, notifications do not.
+ */
+ if (isNotification(message)) {
+ processNotification(message);
+ } else {
+ processMessage(message);
}
- return null;
}
- public NetconfMessage getLastMessage(final int attempts, final int attemptMsDelay) throws InterruptedException {
- final Promise<NetconfMessage> promise = this.promiseReply();
- this.device.logger.debug("Waiting for reply {}", promise);
- int _plus = (attempts * attemptMsDelay);
- final boolean messageAvailable = promise.await(_plus);
- if (messageAvailable) {
+ private synchronized void processMessage(final NetconfMessage message) {
+ final Request r = requests.peek();
+ if (r.future.isUncancellable()) {
+ requests.poll();
+ LOG.debug("Matched {} to {}", r.request, message);
+
+ try {
+ NetconfMapping.checkValidReply(r.request, message);
+ } catch (IllegalStateException e) {
+ LOG.warn("Invalid request-reply match, reply message contains different message-id", e);
+ r.future.setException(e);
+ return;
+ }
+
try {
- try {
- return promise.get();
- } catch (Throwable _e) {
- throw Exceptions.sneakyThrow(_e);
+ NetconfMapping.checkSuccessReply(message);
+ } catch (IllegalStateException e) {
+ LOG.warn("Error reply from remote device", e);
+ r.future.setException(e);
+ return;
+ }
+
+ r.future.set(Rpcs.getRpcResult(true, NetconfMapping.toNotificationNode(message, device.getSchemaContext()),
+ Collections.<RpcError>emptyList()));
+ } else {
+ LOG.warn("Ignoring unsolicited message", message);
+ }
+ }
+
+ synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message) {
+ if (session == null) {
+ LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message);
+ return Futures.<RpcResult<CompositeNode>>immediateFuture(new RpcResult<CompositeNode>() {
+ @Override
+ public boolean isSuccessful() {
+ return false;
}
- } catch (final Throwable _t) {
- if (_t instanceof ExecutionException) {
- final ExecutionException e = (ExecutionException) _t;
- IllegalStateException _illegalStateException = new IllegalStateException(e);
- throw _illegalStateException;
+
+ @Override
+ public CompositeNode getResult() {
+ return null;
+ }
+
+ @Override
+ public Collection<RpcError> getErrors() {
+ // FIXME: indicate that the session is down
+ return Collections.emptySet();
+ }
+ });
+ }
+
+ final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message);
+ requests.add(req);
+
+ session.sendMessage(req.request).addListener(new FutureListener<Void>() {
+ @Override
+ public void operationComplete(final Future<Void> future) throws Exception {
+ if (!future.isSuccess()) {
+ // We expect that a session down will occur at this point
+ LOG.debug("Failed to send request {}", req.request, future.cause());
+ req.future.setException(future.cause());
} else {
- throw Exceptions.sneakyThrow(_t);
+ LOG.trace("Finished sending request {}", req.request);
}
}
- }
- String _plus_1 = ("Unsuccessful after " + Integer.valueOf(attempts));
- String _plus_2 = (_plus_1 + " attempts.");
- IllegalStateException _illegalStateException_1 = new IllegalStateException(_plus_2);
- throw _illegalStateException_1;
+ });
+
+ return req.future;
}
- public synchronized Promise<NetconfMessage> promiseReply() {
- this.device.logger.debug("Promising reply.");
- this.promiseLock.lock();
- try {
- boolean _equals = Objects.equal(this.messagePromise, null);
- if (_equals) {
- Promise<NetconfMessage> _newPromise = this.eventExecutor.<NetconfMessage> newPromise();
- this.messagePromise = _newPromise;
- return this.messagePromise;
- }
- return this.messagePromise;
- } finally {
- this.promiseLock.unlock();
+ /**
+ * Process an incoming notification.
+ *
+ * @param notification Notification message
+ */
+ private void processNotification(final NetconfMessage notification) {
+ this.device.logger.debug("Received NETCONF notification.", notification);
+ CompositeNode domNotification = NetconfMapping.toNotificationNode(notification, device.getSchemaContext());
+ if (domNotification == null) {
+ return;
+ }
+
+ MountProvisionInstance mountInstance = this.device.getMountInstance();
+ if (mountInstance != null) {
+ mountInstance.publish(domNotification);
}
}
- public boolean isNotification(final NetconfMessage message) {
- Document _document = message.getDocument();
- final XmlElement xmle = XmlElement.fromDomDocument(_document);
- String _name = xmle.getName();
- return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(_name);
+ private static boolean isNotification(final NetconfMessage message) {
+ final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
+ return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
}
}