412e94d0aedc259f72fc51c188c979809b988512
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
9
10 import static com.google.common.base.Preconditions.checkState;
11
12 import io.netty.buffer.ByteBuf;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelPromise;
15 import java.io.IOException;
16 import java.nio.charset.StandardCharsets;
17 import java.util.Deque;
18 import java.util.LinkedList;
19 import java.util.regex.Matcher;
20 import java.util.regex.Pattern;
21 import org.checkerframework.checker.lock.qual.GuardedBy;
22 import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
23 import org.opendaylight.netconf.shaded.sshd.common.io.WritePendingException;
24 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer;
25 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 /**
30  * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
31  * Also handles pending writes by caching requests until pending state is over.
32  */
33 @Deprecated(since = "7.0.0", forRemoval = true)
34 public final class AsyncSshHandlerWriter implements AutoCloseable {
35     private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandlerWriter.class);
36     private static final Pattern NON_ASCII = Pattern.compile("([^\\x20-\\x7E\\x0D\\x0A])+");
37
38     // public static final int MAX_PENDING_WRITES = 1000;
39     // TODO implement Limiting mechanism for pending writes
40     // But there is a possible issue with limiting:
41     // 1. What to do when queue is full ? Immediate Fail for every request ?
42     // 2. At this level we might be dealing with Chunks of messages(not whole messages)
43     // and unexpected behavior might occur when we send/queue 1 chunk and fail the other chunks
44
45     private final Object asyncInLock = new Object();
46     private volatile IoOutputStream asyncIn;
47
48     // Order has to be preserved for queued writes
49     private final Deque<PendingWriteRequest> pending = new LinkedList<>();
50
51     @GuardedBy("asyncInLock")
52     private boolean isWriteExecuted = false;
53
54     public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
55         this.asyncIn = asyncIn;
56     }
57
58     public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
59         if (asyncIn == null) {
60             promise.setFailure(new IllegalStateException("Channel closed"));
61             return;
62         }
63         // synchronized block due to deadlock that happens on ssh window resize
64         // writes and pending writes would lock the underlyinch channel session
65         // window resize write would try to write the message on an already locked channelSession
66         // while the pending write was in progress from the write callback
67         synchronized (asyncInLock) {
68             // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
69             // If we are closed/closing, set immediate fail
70             if (asyncIn.isClosed() || asyncIn.isClosing()) {
71                 promise.setFailure(new IllegalStateException("Channel closed"));
72             } else {
73                 final ByteBuf byteBufMsg = (ByteBuf) msg;
74                 if (isWriteExecuted) {
75                     queueRequest(ctx, byteBufMsg, promise);
76                     return;
77                 }
78
79                 writeWithPendingDetection(ctx, promise, byteBufMsg, false);
80             }
81         }
82     }
83
84     //sending message with pending
85     //if resending message not succesfull, then attribute wasPending is true
86     private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise,
87                                            final ByteBuf byteBufMsg, final boolean wasPending) {
88         try {
89
90             if (LOG.isTraceEnabled()) {
91                 LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
92             }
93
94             isWriteExecuted = true;
95
96             asyncIn.writeBuffer(toBuffer(byteBufMsg)).addListener(future -> {
97                 // synchronized block due to deadlock that happens on ssh window resize
98                 // writes and pending writes would lock the underlying channel session
99                 // window resize write would try to write the message on an already locked channelSession,
100                 // while the pending write was in progress from the write callback
101                 synchronized (asyncInLock) {
102                     final var cause = future.getException();
103                     if (LOG.isTraceEnabled()) {
104                         LOG.trace("Ssh write request finished on channel: {} with ex: {}, message: {}", ctx.channel(),
105                             cause, byteBufToString(byteBufMsg));
106                     }
107
108                     // Notify success or failure
109                     if (cause != null) {
110                         LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(),
111                             byteBufToString(byteBufMsg), cause);
112                         promise.setFailure(cause);
113                     } else {
114                         promise.setSuccess();
115                     }
116
117                     //rescheduling message from queue after successfully sent
118                     if (wasPending) {
119                         byteBufMsg.resetReaderIndex();
120                         pending.remove();
121                     }
122
123                     // Not needed anymore, release
124                     byteBufMsg.release();
125                 }
126
127                 // Check pending queue and schedule next
128                 // At this time we are guaranteed that we are not in pending state anymore
129                 // so the next request should succeed
130                 writePendingIfAny();
131             });
132
133         } catch (final IOException | WritePendingException e) {
134             if (!wasPending) {
135                 queueRequest(ctx, byteBufMsg, promise);
136             }
137         }
138     }
139
140     private void writePendingIfAny() {
141         synchronized (asyncInLock) {
142             final PendingWriteRequest pendingWrite = pending.peek();
143             if (pendingWrite == null) {
144                 isWriteExecuted = false;
145                 return;
146             }
147
148             final ByteBuf msg = pendingWrite.msg;
149             if (LOG.isTraceEnabled()) {
150                 LOG.trace("Writing pending request on channel: {}, message: {}",
151                         pendingWrite.ctx.channel(), byteBufToString(msg));
152             }
153
154             writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
155         }
156     }
157
158     public static String byteBufToString(final ByteBuf msg) {
159         final String message = msg.toString(StandardCharsets.UTF_8);
160         msg.resetReaderIndex();
161         Matcher matcher = NON_ASCII.matcher(message);
162         return matcher.replaceAll(data -> {
163             StringBuilder buf = new StringBuilder();
164             buf.append("\"");
165             for (byte b : data.group().getBytes(StandardCharsets.US_ASCII)) {
166                 buf.append(String.format("%02X", b));
167             }
168             buf.append("\"");
169             return buf.toString();
170         });
171     }
172
173     private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
174         LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
175         if (LOG.isTraceEnabled()) {
176             LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
177         }
178
179 //      try {
180         final var req = new PendingWriteRequest(ctx, msg, promise);
181         // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
182         // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
183         // pending.size(), ctx.channel());
184         checkState(pending.offer(req), "Cannot pend another request write (pending count: %s) on channel: %s",
185                 pending.size(), ctx.channel());
186 //        } catch (final Exception ex) {
187 //            LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}",
188 //                    ctx.channel(), ex, byteBufToString(msg));
189 //            msg.release();
190 //            promise.setFailure(ex);
191 //        }
192     }
193
194     @Override
195     public void close() {
196         asyncIn = null;
197     }
198
199     private static Buffer toBuffer(final ByteBuf msg) {
200         // TODO Buffer vs ByteBuf translate, Can we handle that better ?
201         msg.resetReaderIndex();
202         final byte[] temp = new byte[msg.readableBytes()];
203         msg.readBytes(temp, 0, msg.readableBytes());
204         return new ByteArrayBuffer(temp);
205     }
206
207     private static final class PendingWriteRequest {
208         private final ChannelHandlerContext ctx;
209         private final ByteBuf msg;
210         private final ChannelPromise promise;
211
212         PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
213             this.ctx = ctx;
214             // Reset reader index, last write (failed) attempt moved index to the end
215             msg.resetReaderIndex();
216             this.msg = msg;
217             this.promise = promise;
218         }
219     }
220 }