import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import org.apache.sshd.common.future.SshFutureListener;
-import org.apache.sshd.common.io.IoInputStream;
-import org.apache.sshd.common.io.IoReadFuture;
-import org.apache.sshd.common.util.Buffer;
+import org.opendaylight.netconf.shaded.sshd.common.future.SshFutureListener;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoInputStream;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoReadFuture;
+import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer;
+import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandlerReader.class);
- private static final int BUFFER_SIZE = 8192;
+ private static final int BUFFER_SIZE = 2048;
private final AutoCloseable connectionClosedCallback;
private final ReadMsgHandler readHandler;
private Buffer buf;
private IoReadFuture currentReadFuture;
- public AsyncSshHandlerReader(final AutoCloseable connectionClosedCallback, final ReadMsgHandler readHandler, final String channelId, final IoInputStream asyncOut) {
+ public AsyncSshHandlerReader(final AutoCloseable connectionClosedCallback, final ReadMsgHandler readHandler,
+ final String channelId, final IoInputStream asyncOut) {
this.connectionClosedCallback = connectionClosedCallback;
this.readHandler = readHandler;
this.channelId = channelId;
this.asyncOut = asyncOut;
- buf = new Buffer(BUFFER_SIZE);
+ buf = new ByteArrayBuffer(BUFFER_SIZE);
asyncOut.read(buf).addListener(this);
}
@Override
- public synchronized void operationComplete(final IoReadFuture future) {
- if(future.getException() != null) {
+ public void operationComplete(final IoReadFuture future) {
+ if (checkDisconnect(future)) {
+ invokeDisconnect();
+ }
+ }
+ private synchronized boolean checkDisconnect(final IoReadFuture future) {
+ if (future.getException() != null) {
//if asyncout is already set to null by close method, do nothing
- if(asyncOut == null) {
- return;
+ if (asyncOut == null) {
+ return false;
}
- if(asyncOut.isClosed() || asyncOut.isClosing()) {
+ if (asyncOut.isClosed() || asyncOut.isClosing()) {
// Ssh dropped
LOG.debug("Ssh session dropped on channel: {}", channelId, future.getException());
} else {
LOG.warn("Exception while reading from SSH remote on channel {}", channelId, future.getException());
}
- invokeDisconnect();
- return;
- }
-
- if (future.getRead() > 0) {
+ return true;
+ } else if (future.getRead() > 0) {
final ByteBuf msg = Unpooled.wrappedBuffer(buf.array(), 0, future.getRead());
- if(LOG.isTraceEnabled()) {
- LOG.trace("Reading message on channel: {}, message: {}", channelId, AsyncSshHandlerWriter.byteBufToString(msg));
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Reading message on channel: {}, message: {}",
+ channelId, AsyncSshHandlerWriter.byteBufToString(msg));
}
readHandler.onMessageRead(msg);
// Schedule next read
- buf = new Buffer(BUFFER_SIZE);
+ buf = new ByteArrayBuffer(BUFFER_SIZE);
currentReadFuture = asyncOut.read(buf);
currentReadFuture.addListener(this);
}
+ return false;
}
+ /**
+ * Closing of the {@link AsyncSshHandlerReader}. This method should never be called with any locks held since
+ * call to {@link AutoCloseable#close()} can be a source of ABBA deadlock.
+ */
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void invokeDisconnect() {
try {
connectionClosedCallback.close();
@Override
public synchronized void close() {
// Remove self as listener on close to prevent reading from closed input
- if(currentReadFuture != null) {
+ if (currentReadFuture != null) {
currentReadFuture.removeListener(this);
currentReadFuture = null;
}