Use final field as lock in AsyncSshHandlerWriter
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
10
11 import com.google.common.base.Preconditions;
12 import io.netty.buffer.ByteBuf;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelPromise;
15 import java.nio.charset.StandardCharsets;
16 import java.util.Deque;
17 import java.util.LinkedList;
18 import java.util.Queue;
19 import org.apache.sshd.common.future.SshFutureListener;
20 import org.apache.sshd.common.io.IoOutputStream;
21 import org.apache.sshd.common.io.IoWriteFuture;
22 import org.apache.sshd.common.io.WritePendingException;
23 import org.apache.sshd.common.util.Buffer;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
29  * Also handles pending writes by caching requests until pending state is over.
30  */
31 public final class AsyncSshHandlerWriter implements AutoCloseable {
32
33     private static final Logger LOG = LoggerFactory
34             .getLogger(AsyncSshHandlerWriter.class);
35
36     // public static final int MAX_PENDING_WRITES = 1000;
37     // TODO implement Limiting mechanism for pending writes
38     // But there is a possible issue with limiting:
39     // 1. What to do when queue is full ? Immediate Fail for every request ?
40     // 2. At this level we might be dealing with Chunks of messages(not whole messages)
41     // and unexpected behavior might occur when we send/queue 1 chunk and fail the other chunks
42
43     private final Object asyncInLock = new Object();
44     private volatile IoOutputStream asyncIn;
45
46     // Order has to be preserved for queued writes
47     private final Deque<PendingWriteRequest> pending = new LinkedList<>();
48
49     public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
50         this.asyncIn = asyncIn;
51     }
52
53     public void write(final ChannelHandlerContext ctx,
54             final Object msg, final ChannelPromise promise) {
55         if (asyncIn == null) {
56             promise.setFailure(new IllegalStateException("Channel closed"));
57             return;
58         }
59         // synchronized block due to deadlock that happens on ssh window resize
60         // writes and pending writes would lock the underlyinch channel session
61         // window resize write would try to write the message on an already locked channelSession
62         // while the pending write was in progress from the write callback
63         synchronized (asyncInLock) {
64             // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
65             // If we are closed/closing, set immediate fail
66             if (asyncIn.isClosed() || asyncIn.isClosing()) {
67                 promise.setFailure(new IllegalStateException("Channel closed"));
68             } else {
69                 final ByteBuf byteBufMsg = (ByteBuf) msg;
70                 if (pending.isEmpty() == false) {
71                     queueRequest(ctx, byteBufMsg, promise);
72                     return;
73                 }
74
75                 writeWithPendingDetection(ctx, promise, byteBufMsg, false);
76             }
77         }
78     }
79
80     //sending message with pending
81     //if resending message not succesfull, then attribute wasPending is true
82     private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise,
83                                            final ByteBuf byteBufMsg, final boolean wasPending) {
84         try {
85
86             if (LOG.isTraceEnabled()) {
87                 LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
88             }
89             asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
90
91                 @Override
92                 public void operationComplete(final IoWriteFuture future) {
93                     // synchronized block due to deadlock that happens on ssh window resize
94                     // writes and pending writes would lock the underlyinch channel session
95                     // window resize write would try to write the message on an already locked channelSession,
96                     // while the pending write was in progress from the write callback
97                     synchronized (asyncInLock) {
98                         if (LOG.isTraceEnabled()) {
99                             LOG.trace(
100                                 "Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
101                                 ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
102                         }
103
104                         // Notify success or failure
105                         if (future.isWritten()) {
106                             promise.setSuccess();
107                         } else {
108                             LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(),
109                                     byteBufToString(byteBufMsg), future.getException());
110                             promise.setFailure(future.getException());
111                         }
112
113                         // Not needed anymore, release
114                         byteBufMsg.release();
115
116                         //rescheduling message from queue after successfully sent
117                         if (wasPending) {
118                             byteBufMsg.resetReaderIndex();
119                             pending.remove();
120                         }
121                     }
122
123                     // Check pending queue and schedule next
124                     // At this time we are guaranteed that we are not in pending state anymore
125                     // so the next request should succeed
126                     writePendingIfAny();
127                 }
128             });
129
130         } catch (final WritePendingException e) {
131
132             if (wasPending == false) {
133                 queueRequest(ctx, byteBufMsg, promise);
134             }
135         }
136     }
137
138     private void writePendingIfAny() {
139         synchronized (asyncInLock) {
140             if (pending.peek() == null) {
141                 return;
142             }
143
144             final PendingWriteRequest pendingWrite = pending.peek();
145             final ByteBuf msg = pendingWrite.msg;
146             if (LOG.isTraceEnabled()) {
147                 LOG.trace("Writing pending request on channel: {}, message: {}",
148                         pendingWrite.ctx.channel(), byteBufToString(msg));
149             }
150
151             writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
152         }
153     }
154
155     public static String byteBufToString(final ByteBuf msg) {
156         final String s = msg.toString(StandardCharsets.UTF_8);
157         msg.resetReaderIndex();
158         return s;
159     }
160
161     private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
162 //        try {
163         LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
164         if (LOG.isTraceEnabled()) {
165             LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
166         }
167         new PendingWriteRequest(ctx, msg, promise).pend(pending);
168 //        } catch (final Exception ex) {
169 //            LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}",
170 //                    ctx.channel(), ex, byteBufToString(msg));
171 //            msg.release();
172 //            promise.setFailure(ex);
173 //        }
174     }
175
176     @Override
177     public void close() {
178         asyncIn = null;
179     }
180
181     private static Buffer toBuffer(final ByteBuf msg) {
182         // TODO Buffer vs ByteBuf translate, Can we handle that better ?
183         msg.resetReaderIndex();
184         final byte[] temp = new byte[msg.readableBytes()];
185         msg.readBytes(temp, 0, msg.readableBytes());
186         return new Buffer(temp);
187     }
188
189     private static final class PendingWriteRequest {
190         private final ChannelHandlerContext ctx;
191         private final ByteBuf msg;
192         private final ChannelPromise promise;
193
194         PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
195             this.ctx = ctx;
196             // Reset reader index, last write (failed) attempt moved index to the end
197             msg.resetReaderIndex();
198             this.msg = msg;
199             this.promise = promise;
200         }
201
202         public void pend(final Queue<PendingWriteRequest> pending) {
203             // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
204             // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
205             // pending.size(), ctx.channel());
206             Preconditions.checkState(pending.offer(this),
207                 "Cannot pend another request write (pending count: %s) on channel: %s", pending.size(), ctx.channel());
208         }
209     }
210 }