Remove pending limit due to unexpected behaviour with chunked messages.
Extract Reader/Writer into separate classes.
Also lower the amount of requests sent in SSH Stress integration test.
Change-Id: Idff719ac3a6bed9e8939efa01b8306f2884848fe
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
/**
* Test all requests are handled properly and no mismatch occurs in listener
*/
- /* Disabled until fixed
- @Test(timeout = 5*60*1000)
+ @Test(timeout = 6*60*1000)
public void testSecureStress() throws Exception {
- final int requests = 10000;
+ final int requests = 4000;
final NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
final NetconfDeviceCommunicator sessionListener = getSessionListener();
assertEquals(requests, responseCounter.get());
}
}
- */
+
private NetconfMessage changeMessageId(final NetconfMessage getConfig, final int i) throws IOException, SAXException {
String s = XmlUtil.toString(getConfig.getDocument(), false);
s = s.replace("101", Integer.toString(i));
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandler;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoReadFuture;
+import org.apache.sshd.common.util.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listener on async input stream from SSH session.
+ * This listeners schedules reads in a loop until the session is closed or read fails.
+ */
+final class AsyncSshHanderReader implements SshFutureListener<IoReadFuture>, AutoCloseable {
+
+ private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class);
+
+ private static final int BUFFER_SIZE = 8192;
+
+ private final ChannelOutboundHandler asyncSshHandler;
+ private final ChannelHandlerContext ctx;
+
+ private IoInputStream asyncOut;
+ private Buffer buf;
+ private IoReadFuture currentReadFuture;
+
+ public AsyncSshHanderReader(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+ this.asyncSshHandler = asyncSshHandler;
+ this.ctx = ctx;
+ this.asyncOut = asyncOut;
+ buf = new Buffer(BUFFER_SIZE);
+ asyncOut.read(buf).addListener(this);
+ }
+
+ @Override
+ public synchronized void operationComplete(final IoReadFuture future) {
+ if(future.getException() != null) {
+ if(asyncOut.isClosed() || asyncOut.isClosing()) {
+ // Ssh dropped
+ logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
+ } else {
+ logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
+ }
+ invokeDisconnect();
+ return;
+ }
+
+ if (future.getRead() > 0) {
+ ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()));
+
+ // Schedule next read
+ buf = new Buffer(BUFFER_SIZE);
+ currentReadFuture = asyncOut.read(buf);
+ currentReadFuture.addListener(this);
+ }
+ }
+
+ private void invokeDisconnect() {
+ try {
+ asyncSshHandler.disconnect(ctx, ctx.newPromise());
+ } catch (final Exception e) {
+ // This should not happen
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ // Remove self as listener on close to prevent reading from closed input
+ if(currentReadFuture != null) {
+ currentReadFuture.removeListener(this);
+ }
+
+ asyncOut = null;
+ }
+}
package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
import com.google.common.base.Preconditions;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.io.IOException;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
-import org.apache.sshd.common.io.IoInputStream;
-import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.io.IoReadFuture;
-import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.WritePendingException;
-import org.apache.sshd.common.util.Buffer;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final AuthenticationHandler authenticationHandler;
private final SshClient sshClient;
- private SshReadAsyncListener sshReadAsyncListener;
- private SshWriteAsyncHandler sshWriteAsyncHandler;
+ private AsyncSshHanderReader sshReadAsyncListener;
+ private AsyncSshHandlerWriter sshWriteAsyncHandler;
private ClientChannel channel;
private ClientSession session;
connectPromise.setSuccess();
connectPromise = null;
- sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
+ sshReadAsyncListener = new AsyncSshHanderReader(this, ctx, channel.getAsyncOut());
// if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
if(channel != null) {
- sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+ sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn());
ctx.fireChannelActive();
}
}
ctx.fireChannelInactive();
}
- /**
- * Listener over async input stream from SSH session.
- * This listeners schedules reads in a loop until the session is closed or read fails.
- */
- private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
- private static final int BUFFER_SIZE = 8192;
-
- private final ChannelOutboundHandler asyncSshHandler;
- private final ChannelHandlerContext ctx;
-
- private IoInputStream asyncOut;
- private Buffer buf;
- private IoReadFuture currentReadFuture;
-
- public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
- this.asyncSshHandler = asyncSshHandler;
- this.ctx = ctx;
- this.asyncOut = asyncOut;
- buf = new Buffer(BUFFER_SIZE);
- asyncOut.read(buf).addListener(this);
- }
-
- @Override
- public synchronized void operationComplete(final IoReadFuture future) {
- if(future.getException() != null) {
- if(asyncOut.isClosed() || asyncOut.isClosing()) {
- // Ssh dropped
- logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
- } else {
- logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
- }
- invokeDisconnect();
- return;
- }
-
- if (future.getRead() > 0) {
- ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()));
-
- // Schedule next read
- buf = new Buffer(BUFFER_SIZE);
- currentReadFuture = asyncOut.read(buf);
- currentReadFuture.addListener(this);
- }
- }
-
- private void invokeDisconnect() {
- try {
- asyncSshHandler.disconnect(ctx, ctx.newPromise());
- } catch (final Exception e) {
- // This should not happen
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- public synchronized void close() {
- // Remove self as listener on close to prevent reading from closed input
- if(currentReadFuture != null) {
- currentReadFuture.removeListener(this);
- }
-
- asyncOut = null;
- }
- }
-
- private static final class SshWriteAsyncHandler implements AutoCloseable {
- public static final int MAX_PENDING_WRITES = 100;
-
- private final ChannelOutboundHandler channelHandler;
- private IoOutputStream asyncIn;
-
- // Counter that holds the amount of pending write messages
- // Pending write can occur in case remote window is full
- // In such case, we need to wait for the pending write to finish
- private int pendingWriteCounter;
- // Last write future, that can be pending
- private IoWriteFuture lastWriteFuture;
-
- public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) {
- this.channelHandler = channelHandler;
- this.asyncIn = asyncIn;
- }
-
- int c = 0;
-
- public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
- try {
- if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
- // If we are closed/closing, set immediate fail
- promise.setFailure(new IllegalStateException("Channel closed"));
- } else {
- lastWriteFuture = asyncIn.write(toBuffer(msg));
- lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
-
- @Override
- public void operationComplete(final IoWriteFuture future) {
- ((ByteBuf) msg).release();
-
- // Notify success or failure
- if (future.isWritten()) {
- promise.setSuccess();
- } else {
- promise.setFailure(future.getException());
- }
-
- // Reset last pending future
- synchronized (SshWriteAsyncHandler.this) {
- lastWriteFuture = null;
- }
- }
- });
- }
- } catch (final WritePendingException e) {
- // Check limit for pending writes
- pendingWriteCounter++;
- if(pendingWriteCounter > MAX_PENDING_WRITES) {
- promise.setFailure(e);
- handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
- ", remote window is not getting read or is too small"));
- }
-
- // We need to reset buffer read index, since we've already read it when we tried to write it the first time
- ((ByteBuf) msg).resetReaderIndex();
- logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
-
- // In case of pending, re-invoke write after pending is finished
- Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e);
- lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
- @Override
- public void operationComplete(final IoWriteFuture future) {
- // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first
- // External thread could trigger write on this instance while we are on this line
- // Verify
- if (future.isWritten()) {
- synchronized (SshWriteAsyncHandler.this) {
- // Pending done, decrease counter
- pendingWriteCounter--;
- write(ctx, msg, promise);
- }
- } else {
- // Cannot reschedule pending, fail
- handlePendingFailed(ctx, e);
- }
- }
-
- });
- }
- }
-
- private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) {
- logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
- try {
- channelHandler.disconnect(ctx, ctx.newPromise());
- } catch (final Exception ex) {
- // This should not happen
- throw new IllegalStateException(ex);
- }
- }
-
- @Override
- public void close() {
- asyncIn = null;
- }
-
- private Buffer toBuffer(final Object msg) {
- // TODO Buffer vs ByteBuf translate, Can we handle that better ?
- Preconditions.checkState(msg instanceof ByteBuf);
- final ByteBuf byteBuf = (ByteBuf) msg;
- final byte[] temp = new byte[byteBuf.readableBytes()];
- byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
- return new Buffer(temp);
- }
-
- }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Queue;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.WritePendingException;
+import org.apache.sshd.common.util.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
+ * Also handles pending writes by caching requests until pending state is over.
+ */
+final class AsyncSshHandlerWriter implements AutoCloseable {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(AsyncSshHandlerWriter.class);
+
+ // public static final int MAX_PENDING_WRITES = 1000;
+ // TODO implement Limiting mechanism for pending writes
+ // But there is a possible issue with limiting:
+ // 1. What to do when queue is full ? Immediate Fail for every request ?
+ // 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur
+ // when we send/queue 1 chunk and fail the other chunks
+
+ private IoOutputStream asyncIn;
+
+ // Order has to be preserved for queued writes
+ private final Deque<PendingWriteRequest> pending = new LinkedList<>();
+
+ public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
+ this.asyncIn = asyncIn;
+ }
+
+ public synchronized void write(final ChannelHandlerContext ctx,
+ final Object msg, final ChannelPromise promise) {
+ // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
+ // If we are closed/closing, set immediate fail
+ if (asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
+ promise.setFailure(new IllegalStateException("Channel closed"));
+ } else {
+ final ByteBuf byteBufMsg = (ByteBuf) msg;
+ if (pending.isEmpty() == false) {
+ queueRequest(ctx, byteBufMsg, promise);
+ return;
+ }
+
+ writeWithPendingDetection(ctx, promise, byteBufMsg);
+ }
+ }
+
+ private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) {
+ try {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
+ }
+ asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
+
+ @Override
+ public void operationComplete(final IoWriteFuture future) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
+ ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
+ }
+
+ // Notify success or failure
+ if (future.isWritten()) {
+ promise.setSuccess();
+ } else {
+ logger.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
+ promise.setFailure(future.getException());
+ }
+
+ // Not needed anymore, release
+ byteBufMsg.release();
+
+ // Check pending queue and schedule next
+ // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
+ writePendingIfAny();
+ }
+ });
+ } catch (final WritePendingException e) {
+ queueRequest(ctx, byteBufMsg, promise);
+ }
+ }
+
+ private synchronized void writePendingIfAny() {
+ if (pending.peek() == null) {
+ return;
+ }
+
+ // In case of pending, reschedule next message from queue
+ final PendingWriteRequest pendingWrite = pending.poll();
+ final ByteBuf msg = pendingWrite.msg;
+ if (logger.isTraceEnabled()) {
+ logger.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
+ }
+
+ writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg);
+ }
+
+ private static String byteBufToString(final ByteBuf msg) {
+ msg.resetReaderIndex();
+ final String s = msg.toString(Charsets.UTF_8);
+ msg.resetReaderIndex();
+ return s;
+ }
+
+ private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
+// try {
+ logger.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Queueing request due to pending: {}", byteBufToString(msg));
+ }
+ new PendingWriteRequest(ctx, msg, promise).pend(pending);
+// } catch (final Exception ex) {
+// logger.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
+// msg.release();
+// promise.setFailure(ex);
+// }
+ }
+
+ @Override
+ public synchronized void close() {
+ asyncIn = null;
+ }
+
+ private Buffer toBuffer(final ByteBuf msg) {
+ // TODO Buffer vs ByteBuf translate, Can we handle that better ?
+ final byte[] temp = new byte[msg.readableBytes()];
+ msg.readBytes(temp, 0, msg.readableBytes());
+ return new Buffer(temp);
+ }
+
+ private static final class PendingWriteRequest {
+ private final ChannelHandlerContext ctx;
+ private final ByteBuf msg;
+ private final ChannelPromise promise;
+
+ public PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
+ this.ctx = ctx;
+ // Reset reader index, last write (failed) attempt moved index to the end
+ msg.resetReaderIndex();
+ this.msg = msg;
+ this.promise = promise;
+ }
+
+ public void pend(final Queue<PendingWriteRequest> pending) {
+ // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
+ // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
+ // pending.size(), ctx.channel());
+ Preconditions.checkState(pending.offer(this), "Cannot pend another request write (pending count: %s) on channel: %s",
+ pending.size(), ctx.channel());
+ }
+ }
+}
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.net.SocketAddress;
-import java.nio.channels.WritePendingException;
import org.apache.sshd.ClientChannel;
import org.apache.sshd.ClientSession;
import org.apache.sshd.SshClient;
import org.apache.sshd.common.util.Buffer;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mock;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
// make first write stop pending
firstWriteListener.operationComplete(ioWriteFuture);
- // intercept third listener, this is regular listener for second write to determine success or failure
- final ListenableFuture<SshFutureListener<IoWriteFuture>> afterPendingListener = stubAddListener(ioWriteFuture);
// notify listener for second write that pending has ended
pendingListener.get().operationComplete(ioWriteFuture);
- // Notify third listener (regular listener for second write) that second write succeeded
- afterPendingListener.get().operationComplete(ioWriteFuture);
// verify both write promises successful
verify(firstWritePromise).setSuccess();
verify(secondWritePromise).setSuccess();
}
+ @Ignore("Pending queue is not limited")
@Test
public void testWritePendingMax() throws Exception {
asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
final ChannelPromise secondWritePromise = getMockedPromise();
// now make write throw pending exception
doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < 1001; i++) {
asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
}
- verify(ctx).fireChannelInactive();
+ verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
}
@Test