* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.netconf.nettyutil.handler.ssh.client;
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkState;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Deque;
import java.util.LinkedList;
-import java.util.Queue;
-import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.io.WritePendingException;
-import org.apache.sshd.common.util.buffer.Buffer;
-import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
+import org.opendaylight.netconf.shaded.sshd.common.io.WritePendingException;
+import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer;
+import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Also handles pending writes by caching requests until pending state is over.
*/
public final class AsyncSshHandlerWriter implements AutoCloseable {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(AsyncSshHandlerWriter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandlerWriter.class);
+ private static final Pattern NON_ASCII = Pattern.compile("([^\\x20-\\x7E\\x0D\\x0A])+");
// public static final int MAX_PENDING_WRITES = 1000;
// TODO implement Limiting mechanism for pending writes
// Order has to be preserved for queued writes
private final Deque<PendingWriteRequest> pending = new LinkedList<>();
+ @GuardedBy("asyncInLock")
+ private boolean isWriteExecuted = false;
+
public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
this.asyncIn = asyncIn;
}
- public void write(final ChannelHandlerContext ctx,
- final Object msg, final ChannelPromise promise) {
+ public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
if (asyncIn == null) {
promise.setFailure(new IllegalStateException("Channel closed"));
return;
promise.setFailure(new IllegalStateException("Channel closed"));
} else {
final ByteBuf byteBufMsg = (ByteBuf) msg;
- if (!pending.isEmpty()) {
+ if (isWriteExecuted) {
queueRequest(ctx, byteBufMsg, promise);
return;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
}
- asyncIn.write(toBuffer(byteBufMsg)).addListener(future -> {
+
+ isWriteExecuted = true;
+
+ asyncIn.writeBuffer(toBuffer(byteBufMsg)).addListener(future -> {
// synchronized block due to deadlock that happens on ssh window resize
// writes and pending writes would lock the underlyinch channel session
// window resize write would try to write the message on an already locked channelSession,
promise.setFailure(future.getException());
}
- // Not needed anymore, release
- byteBufMsg.release();
-
//rescheduling message from queue after successfully sent
if (wasPending) {
byteBufMsg.resetReaderIndex();
pending.remove();
}
+
+ // Not needed anymore, release
+ byteBufMsg.release();
}
// Check pending queue and schedule next
writePendingIfAny();
});
- } catch (final WritePendingException e) {
-
+ } catch (final IOException | WritePendingException e) {
if (!wasPending) {
queueRequest(ctx, byteBufMsg, promise);
}
private void writePendingIfAny() {
synchronized (asyncInLock) {
- if (pending.peek() == null) {
+ final PendingWriteRequest pendingWrite = pending.peek();
+ if (pendingWrite == null) {
+ isWriteExecuted = false;
return;
}
- final PendingWriteRequest pendingWrite = pending.peek();
final ByteBuf msg = pendingWrite.msg;
if (LOG.isTraceEnabled()) {
LOG.trace("Writing pending request on channel: {}, message: {}",
}
public static String byteBufToString(final ByteBuf msg) {
- final String s = msg.toString(StandardCharsets.UTF_8);
+ final String message = msg.toString(StandardCharsets.UTF_8);
msg.resetReaderIndex();
- return s;
+ Matcher matcher = NON_ASCII.matcher(message);
+ return matcher.replaceAll(data -> {
+ StringBuilder buf = new StringBuilder();
+ buf.append("\"");
+ for (byte b : data.group().getBytes(StandardCharsets.US_ASCII)) {
+ buf.append(String.format("%02X", b));
+ }
+ buf.append("\"");
+ return buf.toString();
+ });
}
private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
-// try {
LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
if (LOG.isTraceEnabled()) {
LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
}
- new PendingWriteRequest(ctx, msg, promise).pend(pending);
+
+// try {
+ final var req = new PendingWriteRequest(ctx, msg, promise);
+ // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
+ // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
+ // pending.size(), ctx.channel());
+ checkState(pending.offer(req), "Cannot pend another request write (pending count: %s) on channel: %s",
+ pending.size(), ctx.channel());
// } catch (final Exception ex) {
// LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}",
// ctx.channel(), ex, byteBufToString(msg));
this.msg = msg;
this.promise = promise;
}
-
- public void pend(final Queue<PendingWriteRequest> pending) {
- // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
- // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
- // pending.size(), ctx.channel());
- Preconditions.checkState(pending.offer(this),
- "Cannot pend another request write (pending count: %s) on channel: %s", pending.size(), ctx.channel());
- }
}
}