X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2Flistener%2FNetconfDeviceCommunicator.java;h=4da727f5c24e56e37c4e11acd6150d9afded1cd8;hp=e78f2b32df0cbc338bad5b28ac1d5063084b9b96;hb=25a9fb7730311a5ca298d8c6c8b24f0afb0e27be;hpb=6c5efc6eed65b8a351edeaa525f36de1edb77a3d diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java index e78f2b32df..4da727f5c2 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java @@ -10,17 +10,20 @@ package org.opendaylight.controller.sal.connect.netconf.listener; import java.util.ArrayDeque; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Queue; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.NetconfTerminationReason; +import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; import org.opendaylight.controller.netconf.client.NetconfClientSession; import org.opendaylight.controller.netconf.client.NetconfClientSessionListener; import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration; import org.opendaylight.controller.netconf.util.xml.XmlElement; -import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.controller.sal.common.util.RpcErrors; import org.opendaylight.controller.sal.common.util.Rpcs; @@ -35,6 +38,8 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -45,12 +50,9 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, private static final Logger logger = LoggerFactory.getLogger(NetconfDeviceCommunicator.class); - private static final RpcResult FAILED_RPC_RESULT = new FailedRpcResult<>(RpcErrors.getRpcError( - null, null, null, RpcError.ErrorSeverity.ERROR, "Netconf session disconnected", - RpcError.ErrorType.PROTOCOL, null)); - private final RemoteDevice remoteDevice; private final RemoteDeviceId id; + private final Lock sessionLock = new ReentrantLock(); public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice) { @@ -62,14 +64,21 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, private NetconfClientSession session; @Override - public synchronized void onSessionUp(final NetconfClientSession session) { - logger.debug("{}: Session established", id); - this.session = session; + public void onSessionUp(final NetconfClientSession session) { + sessionLock.lock(); + try { + logger.debug("{}: Session established", id); + this.session = session; - final NetconfSessionCapabilities netconfSessionCapabilities = NetconfSessionCapabilities.fromNetconfSession(session); - logger.trace("{}: Session advertised capabilities: {}", id, netconfSessionCapabilities); + final NetconfSessionCapabilities netconfSessionCapabilities = + NetconfSessionCapabilities.fromNetconfSession(session); + logger.trace("{}: Session advertised capabilities: {}", id, netconfSessionCapabilities); - remoteDevice.onRemoteSessionUp(netconfSessionCapabilities, this); + remoteDevice.onRemoteSessionUp(netconfSessionCapabilities, this); + } + finally { + sessionLock.unlock(); + } } public void initializeRemoteConnection(final NetconfClientDispatcher dispatch, @@ -77,37 +86,75 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, dispatch.createReconnectingClient(config); } - private synchronized void tearDown(final Exception e) { - remoteDevice.onRemoteSessionDown(); - session = null; + private void tearDown( String reason ) { + List>> futuresToCancel = Lists.newArrayList(); + sessionLock.lock(); + try { + if( session != null ) { + session = null; + + /* + * Walk all requests, check if they have been executing + * or cancelled and remove them from the queue. + */ + final Iterator it = requests.iterator(); + while (it.hasNext()) { + final Request r = it.next(); + if (r.future.isUncancellable()) { + futuresToCancel.add( r.future ); + it.remove(); + } else if (r.future.isCancelled()) { + // This just does some house-cleaning + it.remove(); + } + } - /* - * Walk all requests, check if they have been executing - * or cancelled and remove them from the queue. - */ - final Iterator it = requests.iterator(); - while (it.hasNext()) { - final Request r = it.next(); - if (r.future.isUncancellable()) { - r.future.setException(e); - it.remove(); - } else if (r.future.isCancelled()) { - // This just does some house-cleaning - it.remove(); + remoteDevice.onRemoteSessionDown(); + } + } + finally { + sessionLock.unlock(); + } + + // Notify pending request futures outside of the sessionLock to avoid unnecessarily + // blocking the caller. + for( UncancellableFuture> future: futuresToCancel ) { + if( Strings.isNullOrEmpty( reason ) ) { + future.set( createSessionDownRpcResult() ); + } else { + future.set( createErrorRpcResult( RpcError.ErrorType.TRANSPORT, reason ) ); } } } + private RpcResult createSessionDownRpcResult() + { + return createErrorRpcResult( RpcError.ErrorType.TRANSPORT, + String.format( "The netconf session to %1$s is disconnected", id.getName() ) ); + } + + private RpcResult createErrorRpcResult( RpcError.ErrorType errorType, String message ) + { + return new FailedRpcResult( RpcErrors.getRpcError( null, + NetconfDocumentedException.ErrorTag.operation_failed.getTagValue(), + null, RpcError.ErrorSeverity.ERROR, message, errorType, null ) ); + } + @Override public void onSessionDown(final NetconfClientSession session, final Exception e) { logger.warn("{}: Session went down", id, e); - tearDown(e); + tearDown( null ); } @Override public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) { logger.warn("{}: Session terminated {}", id, reason); - tearDown(new RuntimeException(reason.getErrorMessage())); + tearDown( reason.getErrorMessage() ); + } + + @Override + public void close() { + tearDown( String.format( "The netconf session to %1$s has been closed", id.getName() ) ); } @Override @@ -123,73 +170,109 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, } } - private synchronized void processMessage(final NetconfMessage message) { - final Request r = requests.peek(); - if (r.future.isUncancellable()) { - requests.poll(); + private void processMessage(final NetconfMessage message) { + Request request = null; + sessionLock.lock(); + try { + request = requests.peek(); + if (request.future.isUncancellable()) { + requests.poll(); + } + else { + request = null; + logger.warn("{}: Ignoring unsolicited message {}", id, msgToS(message)); + } + } + finally { + sessionLock.unlock(); + } + + if( request != null ) { logger.debug("{}: Message received {}", id, message); if(logger.isTraceEnabled()) { - logger.trace("{}: Matched request: {} to response: {}", id, msgToS(r.request), msgToS(message)); + logger.trace( "{}: Matched request: {} to response: {}", id, + msgToS( request.request ), msgToS( message ) ); } try { - NetconfMessageTransformUtil.checkValidReply(r.request, message); - } catch (final IllegalStateException e) { - logger.warn("{}: Invalid request-reply match, reply message contains different message-id, request: {}, response: {}", id, - msgToS(r.request), msgToS(message), e); - r.future.setException(e); + NetconfMessageTransformUtil.checkValidReply( request.request, message ); + } + catch (final NetconfDocumentedException e) { + logger.warn( "{}: Invalid request-reply match, reply message contains different message-id, request: {}, response: {}", + id, msgToS( request.request ), msgToS( message ), e ); + + request.future.set( new FailedRpcResult( + NetconfMessageTransformUtil.toRpcError( e ) ) ); return; } try { NetconfMessageTransformUtil.checkSuccessReply(message); - } catch (NetconfDocumentedException | IllegalStateException e) { - logger.warn("{}: Error reply from remote device, request: {}, response: {}", id, - msgToS(r.request), msgToS(message), e); - r.future.setException(e); + } + catch( NetconfDocumentedException e ) { + logger.warn( "{}: Error reply from remote device, request: {}, response: {}", id, + msgToS( request.request ), msgToS( message ), e ); + + request.future.set( new FailedRpcResult( + NetconfMessageTransformUtil.toRpcError( e ) ) ); return; } - r.future.set(Rpcs.getRpcResult(true, message, Collections.emptySet())); - } else { - logger.warn("{}: Ignoring unsolicited message {}", id, msgToS(message)); + request.future.set(Rpcs.getRpcResult( true, message, Collections.emptySet() ) ); } } - @Override - public void close() { - tearDown(new RuntimeException("Closed")); - } - private static String msgToS(final NetconfMessage msg) { return XmlUtil.toString(msg.getDocument()); } @Override - public synchronized ListenableFuture> sendRequest(final NetconfMessage message, final QName rpc) { + public ListenableFuture> sendRequest( + final NetconfMessage message, final QName rpc) { + sessionLock.lock(); + try { + return sendRequestWithLock( message, rpc ); + } + finally { + sessionLock.unlock(); + } + } + + private ListenableFuture> sendRequestWithLock( + final NetconfMessage message, final QName rpc) { if(logger.isTraceEnabled()) { logger.trace("{}: Sending message {}", id, msgToS(message)); } if (session == null) { logger.warn("{}: Session is disconnected, failing RPC request {}", id, message); - return Futures.immediateFuture(FAILED_RPC_RESULT); + return Futures.immediateFuture( createSessionDownRpcResult() ); } - final Request req = new Request(new UncancellableFuture>(true), message, rpc); + final Request req = new Request( new UncancellableFuture>(true), + message ); requests.add(req); session.sendMessage(req.request).addListener(new FutureListener() { @Override public void operationComplete(final Future future) throws Exception { - if (!future.isSuccess()) { + if( !future.isSuccess() ) { // We expect that a session down will occur at this point - logger.debug("{}: Failed to send request {}", id, XmlUtil.toString(req.request.getDocument()), future.cause()); - req.future.setException(future.cause()); - } else { - logger.trace("{}: Finished sending request {}", id, req.request); + logger.debug( "{}: Failed to send request {}", id, + XmlUtil.toString(req.request.getDocument()), future.cause() ); + + if( future.cause() != null ) { + req.future.set( createErrorRpcResult( RpcError.ErrorType.TRANSPORT, + future.cause().getLocalizedMessage() ) ); + } else { + req.future.set( createSessionDownRpcResult() ); // assume session is down + } + req.future.setException( future.cause() ); + } + else { + logger.trace( "Finished sending request {}", req.request ); } } }); @@ -215,12 +298,11 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, private static final class Request { final UncancellableFuture> future; final NetconfMessage request; - final QName rpc; - private Request(final UncancellableFuture> future, final NetconfMessage request, final QName rpc) { + private Request(final UncancellableFuture> future, + final NetconfMessage request) { this.future = future; this.request = request; - this.rpc = rpc; } } }