private final RemoteDeviceId id;
private final Lock sessionLock = new ReentrantLock();
+ // TODO implement concurrent message limit
private final Queue<Request> requests = new ArrayDeque<>();
private NetconfClientSession session;
+ private Future<?> initFuture;
public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionCapabilities, NetconfMessage> remoteDevice,
final NetconfSessionCapabilities netconfSessionCapabilities) {
public void initializeRemoteConnection(final NetconfClientDispatcher dispatch,
final NetconfClientConfiguration config) {
if(config instanceof NetconfReconnectingClientConfiguration) {
- dispatch.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
+ initFuture = dispatch.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
} else {
- dispatch.createClient(config);
+ initFuture = dispatch.createClient(config);
}
}
@Override
public void close() {
- tearDown( String.format( "The netconf session to %1$s has been closed", id.getName() ) );
+ // Cancel reconnect if in progress
+ if(initFuture != null) {
+ initFuture.cancel(false);
+ }
+ // Disconnect from device
+ if(session != null) {
+ session.close();
+ }
+ tearDown(id + ": Netconf session closed");
}
@Override
private void processMessage(final NetconfMessage message) {
Request request = null;
sessionLock.lock();
+
try {
request = requests.peek();
- if (request.future.isUncancellable()) {
+ if (request != null && request.future.isUncancellable()) {
requests.poll();
- }
- else {
+ } else {
request = null;
logger.warn("{}: Ignoring unsolicited message {}", id, msgToS(message));
}
try {
NetconfMessageTransformUtil.checkSuccessReply(message);
}
- catch( NetconfDocumentedException e ) {
+ catch(final NetconfDocumentedException e) {
logger.warn( "{}: Error reply from remote device, request: {}, response: {}", id,
msgToS( request.request ), msgToS( message ), e );