549bbe18cb96dc67e99bce2dff6134717f82601e
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
10
11 import com.google.common.base.Preconditions;
12 import io.netty.buffer.ByteBuf;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelPromise;
15 import java.io.IOException;
16 import java.nio.charset.StandardCharsets;
17 import java.util.Deque;
18 import java.util.LinkedList;
19 import java.util.Queue;
20 import java.util.regex.Matcher;
21 import java.util.regex.Pattern;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
24 import org.opendaylight.netconf.shaded.sshd.common.io.WritePendingException;
25 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer;
26 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
32  * Also handles pending writes by caching requests until pending state is over.
33  */
34 public final class AsyncSshHandlerWriter implements AutoCloseable {
35
36     private static final Logger LOG = LoggerFactory
37             .getLogger(AsyncSshHandlerWriter.class);
38
39     private static final Pattern NON_ASCII = Pattern.compile("([^\\x20-\\x7E\\x0D\\x0A])+");
40
41     // public static final int MAX_PENDING_WRITES = 1000;
42     // TODO implement Limiting mechanism for pending writes
43     // But there is a possible issue with limiting:
44     // 1. What to do when queue is full ? Immediate Fail for every request ?
45     // 2. At this level we might be dealing with Chunks of messages(not whole messages)
46     // and unexpected behavior might occur when we send/queue 1 chunk and fail the other chunks
47
48     private final Object asyncInLock = new Object();
49     private volatile IoOutputStream asyncIn;
50
51     // Order has to be preserved for queued writes
52     private final Deque<PendingWriteRequest> pending = new LinkedList<>();
53
54     public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
55         this.asyncIn = asyncIn;
56     }
57
58     @GuardedBy("asyncInLock")
59     private boolean isWriteExecuted = false;
60
61     public void write(final ChannelHandlerContext ctx,
62             final Object msg, final ChannelPromise promise) {
63         if (asyncIn == null) {
64             promise.setFailure(new IllegalStateException("Channel closed"));
65             return;
66         }
67         // synchronized block due to deadlock that happens on ssh window resize
68         // writes and pending writes would lock the underlyinch channel session
69         // window resize write would try to write the message on an already locked channelSession
70         // while the pending write was in progress from the write callback
71         synchronized (asyncInLock) {
72             // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
73             // If we are closed/closing, set immediate fail
74             if (asyncIn.isClosed() || asyncIn.isClosing()) {
75                 promise.setFailure(new IllegalStateException("Channel closed"));
76             } else {
77                 final ByteBuf byteBufMsg = (ByteBuf) msg;
78                 if (isWriteExecuted) {
79                     queueRequest(ctx, byteBufMsg, promise);
80                     return;
81                 }
82
83                 writeWithPendingDetection(ctx, promise, byteBufMsg, false);
84             }
85         }
86     }
87
88     //sending message with pending
89     //if resending message not succesfull, then attribute wasPending is true
90     private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise,
91                                            final ByteBuf byteBufMsg, final boolean wasPending) {
92         try {
93
94             if (LOG.isTraceEnabled()) {
95                 LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
96             }
97
98             isWriteExecuted = true;
99
100             asyncIn.writeBuffer(toBuffer(byteBufMsg)).addListener(future -> {
101                 // synchronized block due to deadlock that happens on ssh window resize
102                 // writes and pending writes would lock the underlyinch channel session
103                 // window resize write would try to write the message on an already locked channelSession,
104                 // while the pending write was in progress from the write callback
105                 synchronized (asyncInLock) {
106                     if (LOG.isTraceEnabled()) {
107                         LOG.trace(
108                             "Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
109                             ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
110                     }
111
112                     // Notify success or failure
113                     if (future.isWritten()) {
114                         promise.setSuccess();
115                     } else {
116                         LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(),
117                                 byteBufToString(byteBufMsg), future.getException());
118                         promise.setFailure(future.getException());
119                     }
120
121                     //rescheduling message from queue after successfully sent
122                     if (wasPending) {
123                         byteBufMsg.resetReaderIndex();
124                         pending.remove();
125                     }
126
127                     // Not needed anymore, release
128                     byteBufMsg.release();
129                 }
130
131                 // Check pending queue and schedule next
132                 // At this time we are guaranteed that we are not in pending state anymore
133                 // so the next request should succeed
134                 writePendingIfAny();
135             });
136
137         } catch (final IOException | WritePendingException e) {
138             if (!wasPending) {
139                 queueRequest(ctx, byteBufMsg, promise);
140             }
141         }
142     }
143
144     private void writePendingIfAny() {
145         synchronized (asyncInLock) {
146             if (pending.peek() == null) {
147                 isWriteExecuted = false;
148                 return;
149             }
150
151             final PendingWriteRequest pendingWrite = pending.peek();
152             final ByteBuf msg = pendingWrite.msg;
153             if (LOG.isTraceEnabled()) {
154                 LOG.trace("Writing pending request on channel: {}, message: {}",
155                         pendingWrite.ctx.channel(), byteBufToString(msg));
156             }
157
158             writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
159         }
160     }
161
162     public static String byteBufToString(final ByteBuf msg) {
163         final String message = msg.toString(StandardCharsets.UTF_8);
164         msg.resetReaderIndex();
165         Matcher matcher = NON_ASCII.matcher(message);
166         return matcher.replaceAll((data) -> {
167             StringBuilder buf = new StringBuilder();
168             buf.append("\"");
169             for (byte b : data.group().getBytes(StandardCharsets.US_ASCII)) {
170                 buf.append(String.format("%02X", b));
171             }
172             buf.append("\"");
173             return buf.toString();
174         });
175     }
176
177     private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
178 //        try {
179         LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
180         if (LOG.isTraceEnabled()) {
181             LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
182         }
183         new PendingWriteRequest(ctx, msg, promise).pend(pending);
184 //        } catch (final Exception ex) {
185 //            LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}",
186 //                    ctx.channel(), ex, byteBufToString(msg));
187 //            msg.release();
188 //            promise.setFailure(ex);
189 //        }
190     }
191
192     @Override
193     public void close() {
194         asyncIn = null;
195     }
196
197     private static Buffer toBuffer(final ByteBuf msg) {
198         // TODO Buffer vs ByteBuf translate, Can we handle that better ?
199         msg.resetReaderIndex();
200         final byte[] temp = new byte[msg.readableBytes()];
201         msg.readBytes(temp, 0, msg.readableBytes());
202         return new ByteArrayBuffer(temp);
203     }
204
205     private static final class PendingWriteRequest {
206         private final ChannelHandlerContext ctx;
207         private final ByteBuf msg;
208         private final ChannelPromise promise;
209
210         PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
211             this.ctx = ctx;
212             // Reset reader index, last write (failed) attempt moved index to the end
213             msg.resetReaderIndex();
214             this.msg = msg;
215             this.promise = promise;
216         }
217
218         public void pend(final Queue<PendingWriteRequest> pending) {
219             // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
220             // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
221             // pending.size(), ctx.channel());
222             Preconditions.checkState(pending.offer(this),
223                 "Cannot pend another request write (pending count: %s) on channel: %s", pending.size(), ctx.channel());
224         }
225     }
226 }