Netconf-connector did not reconnect after recent changes (Ssh mina integration)
SshHandler was in pipeline after listener in reconnect, thats why the listener received no event about session down.
Change-Id: Id39062f51bc3a0caf066ca49682a2acc837b06ef
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer<S>() {
@Override
public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer<S>() {
@Override
public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
- initializer.initializeChannel(channel, promise);
-
// add closed channel handler
// add closed channel handler
+ // This handler has to be added before initializer.initializeChannel is called
+ // Initializer might add some handlers using addFirst e.g. AsyncSshHandler and in that case
+ // closed channel handler is before the handler that invokes channel inactive event
channel.pipeline().addFirst(new ClosedChannelHandler(ReconnectPromise.this));
channel.pipeline().addFirst(new ClosedChannelHandler(ReconnectPromise.this));
+
+ initializer.initializeChannel(channel, promise);
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ // Pass info about disconnect further and then reconnect
+ super.channelInactive(ctx);
+
if (promise.isCancelled()) {
return;
}
if (promise.isCancelled()) {
return;
}
- public void onDeviceDisconnected() {
+ public synchronized void onDeviceDisconnected() {
salProvider.getDatastoreAdapter().updateDeviceState(false, Collections.<QName>emptySet());
salProvider.getMountInstance().onDeviceDisconnected();
}
salProvider.getDatastoreAdapter().updateDeviceState(false, Collections.<QName>emptySet());
salProvider.getMountInstance().onDeviceDisconnected();
}
@Override
public void initialize(final Channel ch, final Promise<NetconfClientSession> promise) {
try {
@Override
public void initialize(final Channel ch, final Promise<NetconfClientSession> promise) {
try {
+ // ssh handler has to be the first handler in pipeline
ch.pipeline().addFirst(AsyncSshHandler.createForNetconfSubsystem(authenticationHandler));
super.initialize(ch,promise);
} catch (final IOException e) {
ch.pipeline().addFirst(AsyncSshHandler.createForNetconfSubsystem(authenticationHandler));
super.initialize(ch,promise);
} catch (final IOException e) {
connectPromise.setSuccess();
connectPromise = null;
connectPromise.setSuccess();
connectPromise = null;
- sshReadAsyncListener = new SshReadAsyncListener(ctx, channel.getAsyncOut());
+ sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
ctx.fireChannelActive();
sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
ctx.fireChannelActive();
sshWriteAsyncHandler.write(ctx, msg, promise);
}
sshWriteAsyncHandler.write(ctx, msg, promise);
}
- private static void handleSshSessionClosed(final ChannelHandlerContext ctx) {
- logger.debug("SSH session closed on channel: {}", ctx.channel());
- ctx.fireChannelInactive();
- }
-
@Override
public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
this.connectPromise = promise;
@Override
public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
this.connectPromise = promise;
channel = null;
promise.setSuccess();
channel = null;
promise.setSuccess();
- handleSshSessionClosed(ctx);
+ logger.debug("SSH session closed on channel: {}", ctx.channel());
+ ctx.fireChannelInactive();
private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
private static final int BUFFER_SIZE = 8192;
private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
private static final int BUFFER_SIZE = 8192;
+ private final ChannelOutboundHandler asyncSshHandler;
private final ChannelHandlerContext ctx;
private IoInputStream asyncOut;
private Buffer buf;
private IoReadFuture currentReadFuture;
private final ChannelHandlerContext ctx;
private IoInputStream asyncOut;
private Buffer buf;
private IoReadFuture currentReadFuture;
- public SshReadAsyncListener(final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+ public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+ this.asyncSshHandler = asyncSshHandler;
this.ctx = ctx;
this.asyncOut = asyncOut;
buf = new Buffer(BUFFER_SIZE);
this.ctx = ctx;
this.asyncOut = asyncOut;
buf = new Buffer(BUFFER_SIZE);
if(future.getException() != null) {
if(asyncOut.isClosed() || asyncOut.isClosing()) {
if(future.getException() != null) {
if(asyncOut.isClosed() || asyncOut.isClosing()) {
- // We are closing
- handleSshSessionClosed(ctx);
+
+ // Ssh dropped
+ logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
+ invokeDisconnect();
+ return;
} else {
logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
} else {
logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
- throw new IllegalStateException("Exception while reading from SSH remote on channel " + ctx.channel(), future.getException());
+ private void invokeDisconnect() {
+ try {
+ asyncSshHandler.disconnect(ctx, ctx.newPromise());
+ } catch (final Exception e) {
+ // This should not happen
+ throw new IllegalStateException(e);
+ }
+ }
+
@Override
public synchronized void close() {
// Remove self as listener on close to prevent reading from closed input
@Override
public synchronized void close() {
// Remove self as listener on close to prevent reading from closed input
this.asyncIn = asyncIn;
}
this.asyncIn = asyncIn;
}
public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
try {
public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
try {
- if(asyncIn.isClosed() || asyncIn.isClosing()) {
- handleSshSessionClosed(ctx);
+ if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
+ // If we are closed/closing, set immediate fail
+ promise.setFailure(new IllegalStateException("Channel closed"));
} else {
lastWriteFuture = asyncIn.write(toBuffer(msg));
lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
} else {
lastWriteFuture = asyncIn.write(toBuffer(msg));
lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
// Notify success or failure
if (future.isWritten()) {
promise.setSuccess();
// Notify success or failure
if (future.isWritten()) {
promise.setSuccess();
+ } else {
+ promise.setFailure(future.getException());
- promise.setFailure(future.getException());
// Reset last pending future
synchronized (SshWriteAsyncHandler.this) {
// Reset last pending future
synchronized (SshWriteAsyncHandler.this) {
lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(final IoWriteFuture future) {
lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(final IoWriteFuture future) {
- if(future.isWritten()) {
+ if (future.isWritten()) {
synchronized (SshWriteAsyncHandler.this) {
// Pending done, decrease counter
pendingWriteCounter--;
synchronized (SshWriteAsyncHandler.this) {
// Pending done, decrease counter
pendingWriteCounter--;
AuthProvider authProvider = mock(AuthProviderImpl.class);
doReturn(PEMGenerator.generate().toCharArray()).when(authProvider).getPEMAsCharArray();
doReturn(true).when(authProvider).authenticated(anyString(), anyString());
AuthProvider authProvider = mock(AuthProviderImpl.class);
doReturn(PEMGenerator.generate().toCharArray()).when(authProvider).getPEMAsCharArray();
doReturn(true).when(authProvider).authenticated(anyString(), anyString());
+ doReturn("auth").when(authProvider).toString();
+
NetconfSSHServer netconfSSHServer = NetconfSSHServer.start(10831, NetconfConfigUtil.getNetconfLocalAddress(),
authProvider, new NioEventLoopGroup());
NetconfSSHServer netconfSSHServer = NetconfSSHServer.start(10831, NetconfConfigUtil.getNetconfLocalAddress(),
authProvider, new NioEventLoopGroup());