Fix racecondition in AsyncSshHandlerWriter
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
10
11 import com.google.common.base.Charsets;
12 import com.google.common.base.Preconditions;
13 import io.netty.buffer.ByteBuf;
14 import io.netty.channel.ChannelHandlerContext;
15 import io.netty.channel.ChannelPromise;
16 import java.util.Deque;
17 import java.util.LinkedList;
18 import java.util.Queue;
19 import org.apache.sshd.common.future.SshFutureListener;
20 import org.apache.sshd.common.io.IoOutputStream;
21 import org.apache.sshd.common.io.IoWriteFuture;
22 import org.apache.sshd.common.io.WritePendingException;
23 import org.apache.sshd.common.util.Buffer;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
29  * Also handles pending writes by caching requests until pending state is over.
30  */
31 public final class AsyncSshHandlerWriter implements AutoCloseable {
32
33     private static final Logger LOG = LoggerFactory
34             .getLogger(AsyncSshHandlerWriter.class);
35
36     // public static final int MAX_PENDING_WRITES = 1000;
37     // TODO implement Limiting mechanism for pending writes
38     // But there is a possible issue with limiting:
39     // 1. What to do when queue is full ? Immediate Fail for every request ?
40     // 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur
41     // when we send/queue 1 chunk and fail the other chunks
42
43     private IoOutputStream asyncIn;
44
45     // Order has to be preserved for queued writes
46     private final Deque<PendingWriteRequest> pending = new LinkedList<>();
47
48     public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
49         this.asyncIn = asyncIn;
50     }
51
52     public synchronized void write(final ChannelHandlerContext ctx,
53             final Object msg, final ChannelPromise promise) {
54         // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
55         // If we are closed/closing, set immediate fail
56         if (asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
57             promise.setFailure(new IllegalStateException("Channel closed"));
58         } else {
59             final ByteBuf byteBufMsg = (ByteBuf) msg;
60             if (pending.isEmpty() == false) {
61                 queueRequest(ctx, byteBufMsg, promise);
62                 return;
63             }
64
65             writeWithPendingDetection(ctx, promise, byteBufMsg, false);
66         }
67     }
68
69     //sending message with pending
70     //if resending message not succesfull, then attribute wasPending is true
71     private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, final boolean wasPending) {
72         try {
73
74             if (LOG.isTraceEnabled()) {
75                 LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
76             }
77             asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
78
79                 @Override
80                 public void operationComplete(final IoWriteFuture future) {
81                     if (LOG.isTraceEnabled()) {
82                         LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
83                                 ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
84                     }
85
86                     // Notify success or failure
87                     if (future.isWritten()) {
88                         promise.setSuccess();
89                     } else {
90                         LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
91                         promise.setFailure(future.getException());
92                     }
93
94                     // Not needed anymore, release
95                     byteBufMsg.release();
96
97                     synchronized (AsyncSshHandlerWriter.this) {
98                         //rescheduling message from queue after successfully sent
99                         if (wasPending) {
100                             byteBufMsg.resetReaderIndex();
101                             pending.remove();
102                         }
103                     }
104
105                     // Check pending queue and schedule next
106                     // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
107                     writePendingIfAny();
108
109                 }
110             });
111
112         } catch (final WritePendingException e) {
113
114             if(wasPending == false){
115                 queueRequest(ctx, byteBufMsg, promise);
116             }
117         }
118     }
119
120     private synchronized void writePendingIfAny() {
121
122         if (pending.peek() == null) {
123             return;
124         }
125
126         final PendingWriteRequest pendingWrite = pending.peek();
127         final ByteBuf msg = pendingWrite.msg;
128         if (LOG.isTraceEnabled()) {
129             LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
130         }
131
132         writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
133     }
134
135     public static String byteBufToString(final ByteBuf msg) {
136         final String s = msg.toString(Charsets.UTF_8);
137         msg.resetReaderIndex();
138         return s;
139     }
140
141     private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
142 //        try {
143         LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
144         if (LOG.isTraceEnabled()) {
145             LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
146         }
147         new PendingWriteRequest(ctx, msg, promise).pend(pending);
148 //        } catch (final Exception ex) {
149 //            LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
150 //            msg.release();
151 //            promise.setFailure(ex);
152 //        }
153     }
154
155     @Override
156     public synchronized void close() {
157         asyncIn = null;
158     }
159
160     private Buffer toBuffer(final ByteBuf msg) {
161         // TODO Buffer vs ByteBuf translate, Can we handle that better ?
162         msg.resetReaderIndex();
163         final byte[] temp = new byte[msg.readableBytes()];
164         msg.readBytes(temp, 0, msg.readableBytes());
165         return new Buffer(temp);
166     }
167
168     private static final class PendingWriteRequest {
169         private final ChannelHandlerContext ctx;
170         private final ByteBuf msg;
171         private final ChannelPromise promise;
172
173         public PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
174             this.ctx = ctx;
175             // Reset reader index, last write (failed) attempt moved index to the end
176             msg.resetReaderIndex();
177             this.msg = msg;
178             this.promise = promise;
179         }
180
181         public void pend(final Queue<PendingWriteRequest> pending) {
182             // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
183             // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
184             // pending.size(), ctx.channel());
185             Preconditions.checkState(pending.offer(this), "Cannot pend another request write (pending count: %s) on channel: %s",
186                     pending.size(), ctx.channel());
187         }
188     }
189 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.