X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2Flistener%2FNetconfDeviceCommunicator.java;h=556fc2f1d27982e4a35de0dca8fa936126684b13;hp=4da727f5c24e56e37c4e11acd6150d9afded1cd8;hb=6794f32049ae180ef7f896e08ecf7096cec36edf;hpb=f0f2ca29ae8b5caa769a77854fc5ac883b0a2264 diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java index 4da727f5c2..556fc2f1d2 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java @@ -7,14 +7,20 @@ */ package org.opendaylight.controller.sal.connect.netconf.listener; +import com.google.common.base.Optional; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.GenericFutureListener; import java.util.ArrayDeque; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; - import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.NetconfTerminationReason; @@ -22,47 +28,52 @@ import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; import org.opendaylight.controller.netconf.client.NetconfClientSession; import org.opendaylight.controller.netconf.client.NetconfClientSessionListener; +import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration; import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlUtil; -import org.opendaylight.controller.sal.common.util.RpcErrors; -import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.connect.api.RemoteDevice; import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator; import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; -import org.opendaylight.controller.sal.connect.util.FailedRpcResult; import org.opendaylight.controller.sal.connect.util.RemoteDeviceId; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; - -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; - public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator { private static final Logger logger = LoggerFactory.getLogger(NetconfDeviceCommunicator.class); - private final RemoteDevice remoteDevice; + private final RemoteDevice remoteDevice; + private final Optional overrideNetconfCapabilities; private final RemoteDeviceId id; private final Lock sessionLock = new ReentrantLock(); + // TODO implement concurrent message limit + private final Queue requests = new ArrayDeque<>(); + private NetconfClientSession session; + private Future initFuture; + + public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, + final NetconfSessionPreferences netconfSessionPreferences) { + this(id, remoteDevice, Optional.of(netconfSessionPreferences)); + } + public NetconfDeviceCommunicator(final RemoteDeviceId id, - final RemoteDevice remoteDevice) { + final RemoteDevice remoteDevice) { + this(id, remoteDevice, Optional.absent()); + } + + private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice remoteDevice, + final Optional overrideNetconfCapabilities) { this.id = id; this.remoteDevice = remoteDevice; + this.overrideNetconfCapabilities = overrideNetconfCapabilities; } - private final Queue requests = new ArrayDeque<>(); - private NetconfClientSession session; - @Override public void onSessionUp(final NetconfClientSession session) { sessionLock.lock(); @@ -70,11 +81,16 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, logger.debug("{}: Session established", id); this.session = session; - final NetconfSessionCapabilities netconfSessionCapabilities = - NetconfSessionCapabilities.fromNetconfSession(session); - logger.trace("{}: Session advertised capabilities: {}", id, netconfSessionCapabilities); + NetconfSessionPreferences netconfSessionPreferences = + NetconfSessionPreferences.fromNetconfSession(session); + logger.trace("{}: Session advertised capabilities: {}", id, netconfSessionPreferences); - remoteDevice.onRemoteSessionUp(netconfSessionCapabilities, this); + if(overrideNetconfCapabilities.isPresent()) { + netconfSessionPreferences = netconfSessionPreferences.replaceModuleCaps(overrideNetconfCapabilities.get()); + logger.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id, netconfSessionPreferences); + } + + remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this); } finally { sessionLock.unlock(); @@ -82,8 +98,23 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, } public void initializeRemoteConnection(final NetconfClientDispatcher dispatch, - final NetconfReconnectingClientConfiguration config) { - dispatch.createReconnectingClient(config); + final NetconfClientConfiguration config) { + if(config instanceof NetconfReconnectingClientConfiguration) { + initFuture = dispatch.createReconnectingClient((NetconfReconnectingClientConfiguration) config); + } else { + initFuture = dispatch.createClient(config); + } + + initFuture.addListener(new GenericFutureListener>(){ + + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + logger.debug("{}: Connection failed", id, future.cause()); + NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause()); + } + } + }); } private void tearDown( String reason ) { @@ -135,9 +166,10 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, private RpcResult createErrorRpcResult( RpcError.ErrorType errorType, String message ) { - return new FailedRpcResult( RpcErrors.getRpcError( null, - NetconfDocumentedException.ErrorTag.operation_failed.getTagValue(), - null, RpcError.ErrorSeverity.ERROR, message, errorType, null ) ); + return RpcResultBuilder.failed() + .withError( errorType, NetconfDocumentedException.ErrorTag.operation_failed.getTagValue(), + message ) + .build(); } @Override @@ -154,7 +186,15 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, @Override public void close() { - tearDown( String.format( "The netconf session to %1$s has been closed", id.getName() ) ); + // Cancel reconnect if in progress + if(initFuture != null) { + initFuture.cancel(false); + } + // Disconnect from device + if(session != null) { + session.close(); + } + tearDown(id + ": Netconf session closed"); } @Override @@ -173,12 +213,12 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, private void processMessage(final NetconfMessage message) { Request request = null; sessionLock.lock(); + try { request = requests.peek(); - if (request.future.isUncancellable()) { + if (request != null && request.future.isUncancellable()) { requests.poll(); - } - else { + } else { request = null; logger.warn("{}: Ignoring unsolicited message {}", id, msgToS(message)); } @@ -203,24 +243,24 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener, logger.warn( "{}: Invalid request-reply match, reply message contains different message-id, request: {}, response: {}", id, msgToS( request.request ), msgToS( message ), e ); - request.future.set( new FailedRpcResult( - NetconfMessageTransformUtil.toRpcError( e ) ) ); + request.future.set( RpcResultBuilder.failed() + .withRpcError( NetconfMessageTransformUtil.toRpcError( e ) ).build() ); return; } try { NetconfMessageTransformUtil.checkSuccessReply(message); } - catch( NetconfDocumentedException e ) { + catch(final NetconfDocumentedException e) { logger.warn( "{}: Error reply from remote device, request: {}, response: {}", id, msgToS( request.request ), msgToS( message ), e ); - request.future.set( new FailedRpcResult( - NetconfMessageTransformUtil.toRpcError( e ) ) ); + request.future.set( RpcResultBuilder.failed() + .withRpcError( NetconfMessageTransformUtil.toRpcError( e ) ).build() ); return; } - request.future.set(Rpcs.getRpcResult( true, message, Collections.emptySet() ) ); + request.future.set( RpcResultBuilder.success( message ).build() ); } }