--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connect.netconf.listener;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.GenericFutureListener;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.opendaylight.controller.config.util.xml.XmlElement;
+import org.opendaylight.controller.config.util.xml.XmlUtil;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
+import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.controller.netconf.client.NetconfClientSession;
+import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration;
+import org.opendaylight.controller.sal.connect.api.RemoteDevice;
+import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator<NetconfMessage> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
+
+ private final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice;
+ private final Optional<NetconfSessionPreferences> overrideNetconfCapabilities;
+ private final RemoteDeviceId id;
+ private final Lock sessionLock = new ReentrantLock();
+
+ // TODO implement concurrent message limit
+ private final Queue<Request> requests = new ArrayDeque<>();
+ private NetconfClientSession session;
+ private Future<?> initFuture;
+
+ public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
+ final NetconfSessionPreferences NetconfSessionPreferences) {
+ this(id, remoteDevice, Optional.of(NetconfSessionPreferences));
+ }
+
+ public NetconfDeviceCommunicator(final RemoteDeviceId id,
+ final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice) {
+ this(id, remoteDevice, Optional.<NetconfSessionPreferences>absent());
+ }
+
+ private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
+ final Optional<NetconfSessionPreferences> overrideNetconfCapabilities) {
+ this.id = id;
+ this.remoteDevice = remoteDevice;
+ this.overrideNetconfCapabilities = overrideNetconfCapabilities;
+ }
+
+ @Override
+ public void onSessionUp(final NetconfClientSession session) {
+ sessionLock.lock();
+ try {
+ LOG.debug("{}: Session established", id);
+ this.session = session;
+
+ NetconfSessionPreferences netconfSessionPreferences =
+ NetconfSessionPreferences.fromNetconfSession(session);
+ LOG.trace("{}: Session advertised capabilities: {}", id,
+ netconfSessionPreferences);
+
+ if(overrideNetconfCapabilities.isPresent()) {
+ netconfSessionPreferences = netconfSessionPreferences.addModuleCaps(overrideNetconfCapabilities.get());
+ LOG.debug(
+ "{}: Session capabilities overridden, capabilities that will be used: {}",
+ id, netconfSessionPreferences);
+ }
+
+ remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this);
+ }
+ finally {
+ sessionLock.unlock();
+ }
+ }
+
+ public void initializeRemoteConnection(final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) {
+ // TODO 2313 extract listener from configuration
+ if(config instanceof NetconfReconnectingClientConfiguration) {
+ initFuture = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
+ } else {
+ initFuture = dispatcher.createClient(config);
+ }
+
+
+ initFuture.addListener(new GenericFutureListener<Future<Object>>(){
+
+ @Override
+ public void operationComplete(Future<Object> future) throws Exception {
+ if (!future.isSuccess() && !future.isCancelled()) {
+ LOG.debug("{}: Connection failed", id, future.cause());
+ NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause());
+ }
+ }
+ });
+
+ }
+
+ public void disconnect() {
+ if(session != null) {
+ session.close();
+ }
+ }
+
+ private void tearDown( String reason ) {
+ List<UncancellableFuture<RpcResult<NetconfMessage>>> futuresToCancel = Lists.newArrayList();
+ sessionLock.lock();
+ try {
+ if( session != null ) {
+ session = null;
+
+ /*
+ * Walk all requests, check if they have been executing
+ * or cancelled and remove them from the queue.
+ */
+ final Iterator<Request> it = requests.iterator();
+ while (it.hasNext()) {
+ final Request r = it.next();
+ if (r.future.isUncancellable()) {
+ futuresToCancel.add( r.future );
+ it.remove();
+ } else if (r.future.isCancelled()) {
+ // This just does some house-cleaning
+ it.remove();
+ }
+ }
+
+ remoteDevice.onRemoteSessionDown();
+ }
+ }
+ finally {
+ sessionLock.unlock();
+ }
+
+ // Notify pending request futures outside of the sessionLock to avoid unnecessarily
+ // blocking the caller.
+ for( UncancellableFuture<RpcResult<NetconfMessage>> future: futuresToCancel ) {
+ if( Strings.isNullOrEmpty( reason ) ) {
+ future.set( createSessionDownRpcResult() );
+ } else {
+ future.set( createErrorRpcResult( RpcError.ErrorType.TRANSPORT, reason ) );
+ }
+ }
+ }
+
+ private RpcResult<NetconfMessage> createSessionDownRpcResult() {
+ return createErrorRpcResult( RpcError.ErrorType.TRANSPORT,
+ String.format( "The netconf session to %1$s is disconnected", id.getName() ) );
+ }
+
+ private RpcResult<NetconfMessage> createErrorRpcResult( RpcError.ErrorType errorType, String message ) {
+ return RpcResultBuilder.<NetconfMessage>failed()
+ .withError(errorType, NetconfDocumentedException.ErrorTag.operation_failed.getTagValue(), message).build();
+ }
+
+ @Override
+ public void onSessionDown(final NetconfClientSession session, final Exception e) {
+ LOG.warn("{}: Session went down", id, e);
+ tearDown( null );
+ }
+
+ @Override
+ public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
+ LOG.warn("{}: Session terminated {}", id, reason);
+ tearDown( reason.getErrorMessage() );
+ }
+
+ @Override
+ public void close() {
+ // Cancel reconnect if in progress
+ if(initFuture != null) {
+ initFuture.cancel(false);
+ }
+ // Disconnect from device
+ if(session != null) {
+ session.close();
+ // tear down not necessary, called indirectly by above close
+ }
+ }
+
+ @Override
+ public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
+ /*
+ * Dispatch between notifications and messages. Messages need to be processed
+ * with lock held, notifications do not.
+ */
+ if (isNotification(message)) {
+ processNotification(message);
+ } else {
+ processMessage(message);
+ }
+ }
+
+ private void processMessage(final NetconfMessage message) {
+ Request request = null;
+ sessionLock.lock();
+
+ try {
+ request = requests.peek();
+ if (request != null && request.future.isUncancellable()) {
+ requests.poll();
+ } else {
+ request = null;
+ LOG.warn("{}: Ignoring unsolicited message {}", id,
+ msgToS(message));
+ }
+ }
+ finally {
+ sessionLock.unlock();
+ }
+
+ if( request != null ) {
+
+ LOG.debug("{}: Message received {}", id, message);
+
+ if(LOG.isTraceEnabled()) {
+ LOG.trace( "{}: Matched request: {} to response: {}", id, msgToS( request.request ), msgToS( message ) );
+ }
+
+ try {
+ NetconfMessageTransformUtil.checkValidReply( request.request, message );
+ } catch (final NetconfDocumentedException e) {
+ LOG.warn(
+ "{}: Invalid request-reply match, reply message contains different message-id, request: {}, response: {}",
+ id, msgToS(request.request), msgToS(message), e);
+
+ request.future.set( RpcResultBuilder.<NetconfMessage>failed()
+ .withRpcError( NetconfMessageTransformUtil.toRpcError( e ) ).build() );
+
+ //recursively processing message to eventually find matching request
+ processMessage(message);
+
+ return;
+ }
+
+ try {
+ NetconfMessageTransformUtil.checkSuccessReply(message);
+ } catch(final NetconfDocumentedException e) {
+ LOG.warn(
+ "{}: Error reply from remote device, request: {}, response: {}",
+ id, msgToS(request.request), msgToS(message), e);
+
+ request.future.set( RpcResultBuilder.<NetconfMessage>failed()
+ .withRpcError( NetconfMessageTransformUtil.toRpcError( e ) ).build() );
+ return;
+ }
+
+ request.future.set( RpcResultBuilder.success( message ).build() );
+ }
+ }
+
+ private static String msgToS(final NetconfMessage msg) {
+ return XmlUtil.toString(msg.getDocument());
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
+ sessionLock.lock();
+ try {
+ return sendRequestWithLock( message, rpc );
+ } finally {
+ sessionLock.unlock();
+ }
+ }
+
+ private ListenableFuture<RpcResult<NetconfMessage>> sendRequestWithLock(
+ final NetconfMessage message, final QName rpc) {
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: Sending message {}", id, msgToS(message));
+ }
+
+ if (session == null) {
+ LOG.warn("{}: Session is disconnected, failing RPC request {}",
+ id, message);
+ return Futures.immediateFuture( createSessionDownRpcResult() );
+ }
+
+ final Request req = new Request( new UncancellableFuture<RpcResult<NetconfMessage>>(true),
+ message );
+ requests.add(req);
+
+ session.sendMessage(req.request).addListener(new FutureListener<Void>() {
+ @Override
+ public void operationComplete(final Future<Void> future) throws Exception {
+ if( !future.isSuccess() ) {
+ // We expect that a session down will occur at this point
+ LOG.debug("{}: Failed to send request {}", id,
+ XmlUtil.toString(req.request.getDocument()),
+ future.cause());
+
+ if( future.cause() != null ) {
+ req.future.set( createErrorRpcResult( RpcError.ErrorType.TRANSPORT,
+ future.cause().getLocalizedMessage() ) );
+ } else {
+ req.future.set( createSessionDownRpcResult() ); // assume session is down
+ }
+ req.future.setException( future.cause() );
+ }
+ else {
+ LOG.trace("Finished sending request {}", req.request);
+ }
+ }
+ });
+
+ return req.future;
+ }
+
+ private void processNotification(final NetconfMessage notification) {
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: Notification received: {}", id, notification);
+ }
+
+ remoteDevice.onNotification(notification);
+ }
+
+ private static boolean isNotification(final NetconfMessage message) {
+ final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
+ return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
+ }
+
+ private static final class Request {
+ final UncancellableFuture<RpcResult<NetconfMessage>> future;
+ final NetconfMessage request;
+
+ private Request(final UncancellableFuture<RpcResult<NetconfMessage>> future,
+ final NetconfMessage request) {
+ this.future = future;
+ this.request = request;
+ }
+ }
+}