X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2Flistener%2FNetconfDeviceCommunicator.java;h=556fc2f1d27982e4a35de0dca8fa936126684b13;hp=e78f2b32df0cbc338bad5b28ac1d5063084b9b96;hb=6794f32049ae180ef7f896e08ecf7096cec36edf;hpb=48814d6a264b8f13e5db1422336d9ef25cb05fa9 diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java index e78f2b32df..556fc2f1d2 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java @@ -7,107 +7,194 @@ */ package org.opendaylight.controller.sal.connect.netconf.listener; +import com.google.common.base.Optional; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.GenericFutureListener; import java.util.ArrayDeque; -import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Queue; - +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.NetconfTerminationReason; +import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; import org.opendaylight.controller.netconf.client.NetconfClientSession; import org.opendaylight.controller.netconf.client.NetconfClientSessionListener; +import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration; import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration; import org.opendaylight.controller.netconf.util.xml.XmlElement; -import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.util.xml.XmlUtil; -import org.opendaylight.controller.sal.common.util.RpcErrors; -import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.connect.api.RemoteDevice; import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator; import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; -import org.opendaylight.controller.sal.connect.util.FailedRpcResult; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; - -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; - public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator { private static final Logger logger = LoggerFactory.getLogger(NetconfDeviceCommunicator.class); - private static final RpcResult FAILED_RPC_RESULT = new FailedRpcResult<>(RpcErrors.getRpcError( - null, null, null, RpcError.ErrorSeverity.ERROR, "Netconf session disconnected", - RpcError.ErrorType.PROTOCOL, null)); - - private final RemoteDevice remoteDevice; + private final RemoteDevice remoteDevice; + private final Optional overrideNetconfCapabilities; private final RemoteDeviceId id; + private final Lock sessionLock = new ReentrantLock(); + + // TODO implement concurrent message limit + private final Queue requests = new ArrayDeque<>(); + private NetconfClientSession session; + private Future initFuture; + + public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, + final NetconfSessionPreferences netconfSessionPreferences) { + this(id, remoteDevice, Optional.of(netconfSessionPreferences)); + } public NetconfDeviceCommunicator(final RemoteDeviceId id, - final RemoteDevice remoteDevice) { + final RemoteDevice remoteDevice) { + this(id, remoteDevice, Optional.absent()); + } + + private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, + final Optional overrideNetconfCapabilities) { this.id = id; this.remoteDevice = remoteDevice; + this.overrideNetconfCapabilities = overrideNetconfCapabilities; } - private final Queue requests = new ArrayDeque<>(); - private NetconfClientSession session; - @Override - public synchronized void onSessionUp(final NetconfClientSession session) { - logger.debug("{}: Session established", id); - this.session = session; - - final NetconfSessionCapabilities netconfSessionCapabilities = NetconfSessionCapabilities.fromNetconfSession(session); - logger.trace("{}: Session advertised capabilities: {}", id, netconfSessionCapabilities); + public void onSessionUp(final NetconfClientSession session) { + sessionLock.lock(); + try { + logger.debug("{}: Session established", id); + this.session = session; + + NetconfSessionPreferences netconfSessionPreferences = + NetconfSessionPreferences.fromNetconfSession(session); + logger.trace("{}: Session advertised capabilities: {}", id, netconfSessionPreferences); + + if(overrideNetconfCapabilities.isPresent()) { + netconfSessionPreferences = netconfSessionPreferences.replaceModuleCaps(overrideNetconfCapabilities.get()); + logger.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id, netconfSessionPreferences); + } - remoteDevice.onRemoteSessionUp(netconfSessionCapabilities, this); + remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this); + } + finally { + sessionLock.unlock(); + } } public void initializeRemoteConnection(final NetconfClientDispatcher dispatch, - final NetconfReconnectingClientConfiguration config) { - dispatch.createReconnectingClient(config); + final NetconfClientConfiguration config) { + if(config instanceof NetconfReconnectingClientConfiguration) { + initFuture = dispatch.createReconnectingClient((NetconfReconnectingClientConfiguration) config); + } else { + initFuture = dispatch.createClient(config); + } + + initFuture.addListener(new GenericFutureListener>(){ + + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + logger.debug("{}: Connection failed", id, future.cause()); + NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause()); + } + } + }); } - private synchronized void tearDown(final Exception e) { - remoteDevice.onRemoteSessionDown(); - session = null; + private void tearDown( String reason ) { + List>> futuresToCancel = Lists.newArrayList(); + sessionLock.lock(); + try { + if( session != null ) { + session = null; + + /* + * Walk all requests, check if they have been executing + * or cancelled and remove them from the queue. + */ + final Iterator it = requests.iterator(); + while (it.hasNext()) { + final Request r = it.next(); + if (r.future.isUncancellable()) { + futuresToCancel.add( r.future ); + it.remove(); + } else if (r.future.isCancelled()) { + // This just does some house-cleaning + it.remove(); + } + } - /* - * Walk all requests, check if they have been executing - * or cancelled and remove them from the queue. - */ - final Iterator it = requests.iterator(); - while (it.hasNext()) { - final Request r = it.next(); - if (r.future.isUncancellable()) { - r.future.setException(e); - it.remove(); - } else if (r.future.isCancelled()) { - // This just does some house-cleaning - it.remove(); + remoteDevice.onRemoteSessionDown(); } } + finally { + sessionLock.unlock(); + } + + // Notify pending request futures outside of the sessionLock to avoid unnecessarily + // blocking the caller. + for( UncancellableFuture> future: futuresToCancel ) { + if( Strings.isNullOrEmpty( reason ) ) { + future.set( createSessionDownRpcResult() ); + } else { + future.set( createErrorRpcResult( RpcError.ErrorType.TRANSPORT, reason ) ); + } + } + } + + private RpcResult createSessionDownRpcResult() + { + return createErrorRpcResult( RpcError.ErrorType.TRANSPORT, + String.format( "The netconf session to %1$s is disconnected", id.getName() ) ); + } + + private RpcResult createErrorRpcResult( RpcError.ErrorType errorType, String message ) + { + return RpcResultBuilder.failed() + .withError( errorType, NetconfDocumentedException.ErrorTag.operation_failed.getTagValue(), + message ) + .build(); } @Override public void onSessionDown(final NetconfClientSession session, final Exception e) { logger.warn("{}: Session went down", id, e); - tearDown(e); + tearDown( null ); } @Override public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) { logger.warn("{}: Session terminated {}", id, reason); - tearDown(new RuntimeException(reason.getErrorMessage())); + tearDown( reason.getErrorMessage() ); + } + + @Override + public void close() { + // Cancel reconnect if in progress + if(initFuture != null) { + initFuture.cancel(false); + } + // Disconnect from device + if(session != null) { + session.close(); + } + tearDown(id + ": Netconf session closed"); } @Override @@ -123,73 +210,109 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, } } - private synchronized void processMessage(final NetconfMessage message) { - final Request r = requests.peek(); - if (r.future.isUncancellable()) { - requests.poll(); + private void processMessage(final NetconfMessage message) { + Request request = null; + sessionLock.lock(); + + try { + request = requests.peek(); + if (request != null && request.future.isUncancellable()) { + requests.poll(); + } else { + request = null; + logger.warn("{}: Ignoring unsolicited message {}", id, msgToS(message)); + } + } + finally { + sessionLock.unlock(); + } + + if( request != null ) { logger.debug("{}: Message received {}", id, message); if(logger.isTraceEnabled()) { - logger.trace("{}: Matched request: {} to response: {}", id, msgToS(r.request), msgToS(message)); + logger.trace( "{}: Matched request: {} to response: {}", id, + msgToS( request.request ), msgToS( message ) ); } try { - NetconfMessageTransformUtil.checkValidReply(r.request, message); - } catch (final IllegalStateException e) { - logger.warn("{}: Invalid request-reply match, reply message contains different message-id, request: {}, response: {}", id, - msgToS(r.request), msgToS(message), e); - r.future.setException(e); + NetconfMessageTransformUtil.checkValidReply( request.request, message ); + } + catch (final NetconfDocumentedException e) { + logger.warn( "{}: Invalid request-reply match, reply message contains different message-id, request: {}, response: {}", + id, msgToS( request.request ), msgToS( message ), e ); + + request.future.set( RpcResultBuilder.failed() + .withRpcError( NetconfMessageTransformUtil.toRpcError( e ) ).build() ); return; } try { NetconfMessageTransformUtil.checkSuccessReply(message); - } catch (NetconfDocumentedException | IllegalStateException e) { - logger.warn("{}: Error reply from remote device, request: {}, response: {}", id, - msgToS(r.request), msgToS(message), e); - r.future.setException(e); + } + catch(final NetconfDocumentedException e) { + logger.warn( "{}: Error reply from remote device, request: {}, response: {}", id, + msgToS( request.request ), msgToS( message ), e ); + + request.future.set( RpcResultBuilder.failed() + .withRpcError( NetconfMessageTransformUtil.toRpcError( e ) ).build() ); return; } - r.future.set(Rpcs.getRpcResult(true, message, Collections.emptySet())); - } else { - logger.warn("{}: Ignoring unsolicited message {}", id, msgToS(message)); + request.future.set( RpcResultBuilder.success( message ).build() ); } } - @Override - public void close() { - tearDown(new RuntimeException("Closed")); - } - private static String msgToS(final NetconfMessage msg) { return XmlUtil.toString(msg.getDocument()); } @Override - public synchronized ListenableFuture> sendRequest(final NetconfMessage message, final QName rpc) { + public ListenableFuture> sendRequest( + final NetconfMessage message, final QName rpc) { + sessionLock.lock(); + try { + return sendRequestWithLock( message, rpc ); + } + finally { + sessionLock.unlock(); + } + } + + private ListenableFuture> sendRequestWithLock( + final NetconfMessage message, final QName rpc) { if(logger.isTraceEnabled()) { logger.trace("{}: Sending message {}", id, msgToS(message)); } if (session == null) { logger.warn("{}: Session is disconnected, failing RPC request {}", id, message); - return Futures.immediateFuture(FAILED_RPC_RESULT); + return Futures.immediateFuture( createSessionDownRpcResult() ); } - final Request req = new Request(new UncancellableFuture>(true), message, rpc); + final Request req = new Request( new UncancellableFuture>(true), + message ); requests.add(req); session.sendMessage(req.request).addListener(new FutureListener() { @Override public void operationComplete(final Future future) throws Exception { - if (!future.isSuccess()) { + if( !future.isSuccess() ) { // We expect that a session down will occur at this point - logger.debug("{}: Failed to send request {}", id, XmlUtil.toString(req.request.getDocument()), future.cause()); - req.future.setException(future.cause()); - } else { - logger.trace("{}: Finished sending request {}", id, req.request); + logger.debug( "{}: Failed to send request {}", id, + XmlUtil.toString(req.request.getDocument()), future.cause() ); + + if( future.cause() != null ) { + req.future.set( createErrorRpcResult( RpcError.ErrorType.TRANSPORT, + future.cause().getLocalizedMessage() ) ); + } else { + req.future.set( createSessionDownRpcResult() ); // assume session is down + } + req.future.setException( future.cause() ); + } + else { + logger.trace( "Finished sending request {}", req.request ); } } }); @@ -215,12 +338,11 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, private static final class Request { final UncancellableFuture> future; final NetconfMessage request; - final QName rpc; - private Request(final UncancellableFuture> future, final NetconfMessage request, final QName rpc) { + private Request(final UncancellableFuture> future, + final NetconfMessage request) { this.future = future; this.request = request; - this.rpc = rpc; } } }