bab4a7be632ffd58b98bb0208c8096bdffd5d4a5
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
10
11 import com.google.common.base.Preconditions;
12 import io.netty.buffer.ByteBuf;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelPromise;
15 import java.nio.charset.StandardCharsets;
16 import java.util.Deque;
17 import java.util.LinkedList;
18 import java.util.Queue;
19 import org.apache.sshd.common.io.IoOutputStream;
20 import org.apache.sshd.common.io.WritePendingException;
21 import org.apache.sshd.common.util.buffer.Buffer;
22 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 /**
27  * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
28  * Also handles pending writes by caching requests until pending state is over.
29  */
30 public final class AsyncSshHandlerWriter implements AutoCloseable {
31
32     private static final Logger LOG = LoggerFactory
33             .getLogger(AsyncSshHandlerWriter.class);
34
35     // public static final int MAX_PENDING_WRITES = 1000;
36     // TODO implement Limiting mechanism for pending writes
37     // But there is a possible issue with limiting:
38     // 1. What to do when queue is full ? Immediate Fail for every request ?
39     // 2. At this level we might be dealing with Chunks of messages(not whole messages)
40     // and unexpected behavior might occur when we send/queue 1 chunk and fail the other chunks
41
42     private final Object asyncInLock = new Object();
43     private volatile IoOutputStream asyncIn;
44
45     // Order has to be preserved for queued writes
46     private final Deque<PendingWriteRequest> pending = new LinkedList<>();
47
48     public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
49         this.asyncIn = asyncIn;
50     }
51
52     public void write(final ChannelHandlerContext ctx,
53             final Object msg, final ChannelPromise promise) {
54         if (asyncIn == null) {
55             promise.setFailure(new IllegalStateException("Channel closed"));
56             return;
57         }
58         // synchronized block due to deadlock that happens on ssh window resize
59         // writes and pending writes would lock the underlyinch channel session
60         // window resize write would try to write the message on an already locked channelSession
61         // while the pending write was in progress from the write callback
62         synchronized (asyncInLock) {
63             // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
64             // If we are closed/closing, set immediate fail
65             if (asyncIn.isClosed() || asyncIn.isClosing()) {
66                 promise.setFailure(new IllegalStateException("Channel closed"));
67             } else {
68                 final ByteBuf byteBufMsg = (ByteBuf) msg;
69                 if (!pending.isEmpty()) {
70                     queueRequest(ctx, byteBufMsg, promise);
71                     return;
72                 }
73
74                 writeWithPendingDetection(ctx, promise, byteBufMsg, false);
75             }
76         }
77     }
78
79     //sending message with pending
80     //if resending message not succesfull, then attribute wasPending is true
81     private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise,
82                                            final ByteBuf byteBufMsg, final boolean wasPending) {
83         try {
84
85             if (LOG.isTraceEnabled()) {
86                 LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
87             }
88             asyncIn.write(toBuffer(byteBufMsg)).addListener(future -> {
89                 // synchronized block due to deadlock that happens on ssh window resize
90                 // writes and pending writes would lock the underlyinch channel session
91                 // window resize write would try to write the message on an already locked channelSession,
92                 // while the pending write was in progress from the write callback
93                 synchronized (asyncInLock) {
94                     if (LOG.isTraceEnabled()) {
95                         LOG.trace(
96                             "Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
97                             ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
98                     }
99
100                     // Notify success or failure
101                     if (future.isWritten()) {
102                         promise.setSuccess();
103                     } else {
104                         LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(),
105                                 byteBufToString(byteBufMsg), future.getException());
106                         promise.setFailure(future.getException());
107                     }
108
109                     // Not needed anymore, release
110                     byteBufMsg.release();
111
112                     //rescheduling message from queue after successfully sent
113                     if (wasPending) {
114                         byteBufMsg.resetReaderIndex();
115                         pending.remove();
116                     }
117                 }
118
119                 // Check pending queue and schedule next
120                 // At this time we are guaranteed that we are not in pending state anymore
121                 // so the next request should succeed
122                 writePendingIfAny();
123             });
124
125         } catch (final WritePendingException e) {
126
127             if (!wasPending) {
128                 queueRequest(ctx, byteBufMsg, promise);
129             }
130         }
131     }
132
133     private void writePendingIfAny() {
134         synchronized (asyncInLock) {
135             if (pending.peek() == null) {
136                 return;
137             }
138
139             final PendingWriteRequest pendingWrite = pending.peek();
140             final ByteBuf msg = pendingWrite.msg;
141             if (LOG.isTraceEnabled()) {
142                 LOG.trace("Writing pending request on channel: {}, message: {}",
143                         pendingWrite.ctx.channel(), byteBufToString(msg));
144             }
145
146             writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
147         }
148     }
149
150     public static String byteBufToString(final ByteBuf msg) {
151         final String s = msg.toString(StandardCharsets.UTF_8);
152         msg.resetReaderIndex();
153         return s;
154     }
155
156     private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
157 //        try {
158         LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
159         if (LOG.isTraceEnabled()) {
160             LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
161         }
162         new PendingWriteRequest(ctx, msg, promise).pend(pending);
163 //        } catch (final Exception ex) {
164 //            LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}",
165 //                    ctx.channel(), ex, byteBufToString(msg));
166 //            msg.release();
167 //            promise.setFailure(ex);
168 //        }
169     }
170
171     @Override
172     public void close() {
173         asyncIn = null;
174     }
175
176     private static Buffer toBuffer(final ByteBuf msg) {
177         // TODO Buffer vs ByteBuf translate, Can we handle that better ?
178         msg.resetReaderIndex();
179         final byte[] temp = new byte[msg.readableBytes()];
180         msg.readBytes(temp, 0, msg.readableBytes());
181         return new ByteArrayBuffer(temp);
182     }
183
184     private static final class PendingWriteRequest {
185         private final ChannelHandlerContext ctx;
186         private final ByteBuf msg;
187         private final ChannelPromise promise;
188
189         PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
190             this.ctx = ctx;
191             // Reset reader index, last write (failed) attempt moved index to the end
192             msg.resetReaderIndex();
193             this.msg = msg;
194             this.promise = promise;
195         }
196
197         public void pend(final Queue<PendingWriteRequest> pending) {
198             // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
199             // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
200             // pending.size(), ctx.channel());
201             Preconditions.checkState(pending.offer(this),
202                 "Cannot pend another request write (pending count: %s) on channel: %s", pending.size(), ctx.channel());
203         }
204     }
205 }