2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
10 import static com.google.common.base.Preconditions.checkState;
12 import io.netty.buffer.ByteBuf;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelPromise;
15 import java.io.IOException;
16 import java.nio.charset.StandardCharsets;
17 import java.util.Deque;
18 import java.util.LinkedList;
19 import java.util.regex.Matcher;
20 import java.util.regex.Pattern;
21 import org.checkerframework.checker.lock.qual.GuardedBy;
22 import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
23 import org.opendaylight.netconf.shaded.sshd.common.io.WritePendingException;
24 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer;
25 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
30 * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
31 * Also handles pending writes by caching requests until pending state is over.
33 public final class AsyncSshHandlerWriter implements AutoCloseable {
34 private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandlerWriter.class);
35 private static final Pattern NON_ASCII = Pattern.compile("([^\\x20-\\x7E\\x0D\\x0A])+");
37 // public static final int MAX_PENDING_WRITES = 1000;
38 // TODO implement Limiting mechanism for pending writes
39 // But there is a possible issue with limiting:
40 // 1. What to do when queue is full ? Immediate Fail for every request ?
41 // 2. At this level we might be dealing with Chunks of messages(not whole messages)
42 // and unexpected behavior might occur when we send/queue 1 chunk and fail the other chunks
44 private final Object asyncInLock = new Object();
45 private volatile IoOutputStream asyncIn;
47 // Order has to be preserved for queued writes
48 private final Deque<PendingWriteRequest> pending = new LinkedList<>();
50 @GuardedBy("asyncInLock")
51 private boolean isWriteExecuted = false;
53 public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
54 this.asyncIn = asyncIn;
57 public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
58 if (asyncIn == null) {
59 promise.setFailure(new IllegalStateException("Channel closed"));
62 // synchronized block due to deadlock that happens on ssh window resize
63 // writes and pending writes would lock the underlyinch channel session
64 // window resize write would try to write the message on an already locked channelSession
65 // while the pending write was in progress from the write callback
66 synchronized (asyncInLock) {
67 // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
68 // If we are closed/closing, set immediate fail
69 if (asyncIn.isClosed() || asyncIn.isClosing()) {
70 promise.setFailure(new IllegalStateException("Channel closed"));
72 final ByteBuf byteBufMsg = (ByteBuf) msg;
73 if (isWriteExecuted) {
74 queueRequest(ctx, byteBufMsg, promise);
78 writeWithPendingDetection(ctx, promise, byteBufMsg, false);
83 //sending message with pending
84 //if resending message not succesfull, then attribute wasPending is true
85 private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise,
86 final ByteBuf byteBufMsg, final boolean wasPending) {
89 if (LOG.isTraceEnabled()) {
90 LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
93 isWriteExecuted = true;
95 asyncIn.writeBuffer(toBuffer(byteBufMsg)).addListener(future -> {
96 // synchronized block due to deadlock that happens on ssh window resize
97 // writes and pending writes would lock the underlyinch channel session
98 // window resize write would try to write the message on an already locked channelSession,
99 // while the pending write was in progress from the write callback
100 synchronized (asyncInLock) {
101 if (LOG.isTraceEnabled()) {
103 "Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
104 ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
107 // Notify success or failure
108 if (future.isWritten()) {
109 promise.setSuccess();
111 LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(),
112 byteBufToString(byteBufMsg), future.getException());
113 promise.setFailure(future.getException());
116 //rescheduling message from queue after successfully sent
118 byteBufMsg.resetReaderIndex();
122 // Not needed anymore, release
123 byteBufMsg.release();
126 // Check pending queue and schedule next
127 // At this time we are guaranteed that we are not in pending state anymore
128 // so the next request should succeed
132 } catch (final IOException | WritePendingException e) {
134 queueRequest(ctx, byteBufMsg, promise);
139 private void writePendingIfAny() {
140 synchronized (asyncInLock) {
141 final PendingWriteRequest pendingWrite = pending.peek();
142 if (pendingWrite == null) {
143 isWriteExecuted = false;
147 final ByteBuf msg = pendingWrite.msg;
148 if (LOG.isTraceEnabled()) {
149 LOG.trace("Writing pending request on channel: {}, message: {}",
150 pendingWrite.ctx.channel(), byteBufToString(msg));
153 writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
157 public static String byteBufToString(final ByteBuf msg) {
158 final String message = msg.toString(StandardCharsets.UTF_8);
159 msg.resetReaderIndex();
160 Matcher matcher = NON_ASCII.matcher(message);
161 return matcher.replaceAll(data -> {
162 StringBuilder buf = new StringBuilder();
164 for (byte b : data.group().getBytes(StandardCharsets.US_ASCII)) {
165 buf.append(String.format("%02X", b));
168 return buf.toString();
172 private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
173 LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
174 if (LOG.isTraceEnabled()) {
175 LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
179 final var req = new PendingWriteRequest(ctx, msg, promise);
180 // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
181 // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
182 // pending.size(), ctx.channel());
183 checkState(pending.offer(req), "Cannot pend another request write (pending count: %s) on channel: %s",
184 pending.size(), ctx.channel());
185 // } catch (final Exception ex) {
186 // LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}",
187 // ctx.channel(), ex, byteBufToString(msg));
189 // promise.setFailure(ex);
194 public void close() {
198 private static Buffer toBuffer(final ByteBuf msg) {
199 // TODO Buffer vs ByteBuf translate, Can we handle that better ?
200 msg.resetReaderIndex();
201 final byte[] temp = new byte[msg.readableBytes()];
202 msg.readBytes(temp, 0, msg.readableBytes());
203 return new ByteArrayBuffer(temp);
206 private static final class PendingWriteRequest {
207 private final ChannelHandlerContext ctx;
208 private final ByteBuf msg;
209 private final ChannelPromise promise;
211 PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
213 // Reset reader index, last write (failed) attempt moved index to the end
214 msg.resetReaderIndex();
216 this.promise = promise;