2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
11 import com.google.common.base.Charsets;
12 import com.google.common.base.Preconditions;
13 import io.netty.buffer.ByteBuf;
14 import io.netty.channel.ChannelHandlerContext;
15 import io.netty.channel.ChannelPromise;
16 import java.util.Deque;
17 import java.util.LinkedList;
18 import java.util.Queue;
19 import org.apache.sshd.common.future.SshFutureListener;
20 import org.apache.sshd.common.io.IoOutputStream;
21 import org.apache.sshd.common.io.IoWriteFuture;
22 import org.apache.sshd.common.io.WritePendingException;
23 import org.apache.sshd.common.util.Buffer;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
28 * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
29 * Also handles pending writes by caching requests until pending state is over.
31 public final class AsyncSshHandlerWriter implements AutoCloseable {
33 private static final Logger LOG = LoggerFactory
34 .getLogger(AsyncSshHandlerWriter.class);
36 // public static final int MAX_PENDING_WRITES = 1000;
37 // TODO implement Limiting mechanism for pending writes
38 // But there is a possible issue with limiting:
39 // 1. What to do when queue is full ? Immediate Fail for every request ?
40 // 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur
41 // when we send/queue 1 chunk and fail the other chunks
43 private IoOutputStream asyncIn;
45 // Order has to be preserved for queued writes
46 private final Deque<PendingWriteRequest> pending = new LinkedList<>();
48 public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
49 this.asyncIn = asyncIn;
52 public synchronized void write(final ChannelHandlerContext ctx,
53 final Object msg, final ChannelPromise promise) {
54 // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
55 // If we are closed/closing, set immediate fail
56 if (asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
57 promise.setFailure(new IllegalStateException("Channel closed"));
59 final ByteBuf byteBufMsg = (ByteBuf) msg;
60 if (pending.isEmpty() == false) {
61 queueRequest(ctx, byteBufMsg, promise);
65 writeWithPendingDetection(ctx, promise, byteBufMsg, false);
69 //sending message with pending
70 //if resending message not succesfull, then attribute wasPending is true
71 private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, final boolean wasPending) {
74 if (LOG.isTraceEnabled()) {
75 LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
77 asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
80 public void operationComplete(final IoWriteFuture future) {
81 if (LOG.isTraceEnabled()) {
82 LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
83 ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
86 // Notify success or failure
87 if (future.isWritten()) {
90 LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
91 promise.setFailure(future.getException());
94 // Not needed anymore, release
97 synchronized (AsyncSshHandlerWriter.this) {
98 //rescheduling message from queue after successfully sent
100 byteBufMsg.resetReaderIndex();
105 // Check pending queue and schedule next
106 // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
112 } catch (final WritePendingException e) {
114 if(wasPending == false){
115 queueRequest(ctx, byteBufMsg, promise);
120 private synchronized void writePendingIfAny() {
122 if (pending.peek() == null) {
126 final PendingWriteRequest pendingWrite = pending.peek();
127 final ByteBuf msg = pendingWrite.msg;
128 if (LOG.isTraceEnabled()) {
129 LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
132 writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
135 public static String byteBufToString(final ByteBuf msg) {
136 final String s = msg.toString(Charsets.UTF_8);
137 msg.resetReaderIndex();
141 private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
143 LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
144 if (LOG.isTraceEnabled()) {
145 LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
147 new PendingWriteRequest(ctx, msg, promise).pend(pending);
148 // } catch (final Exception ex) {
149 // LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
151 // promise.setFailure(ex);
156 public synchronized void close() {
160 private Buffer toBuffer(final ByteBuf msg) {
161 // TODO Buffer vs ByteBuf translate, Can we handle that better ?
162 msg.resetReaderIndex();
163 final byte[] temp = new byte[msg.readableBytes()];
164 msg.readBytes(temp, 0, msg.readableBytes());
165 return new Buffer(temp);
168 private static final class PendingWriteRequest {
169 private final ChannelHandlerContext ctx;
170 private final ByteBuf msg;
171 private final ChannelPromise promise;
173 public PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
175 // Reset reader index, last write (failed) attempt moved index to the end
176 msg.resetReaderIndex();
178 this.promise = promise;
181 public void pend(final Queue<PendingWriteRequest> pending) {
182 // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
183 // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
184 // pending.size(), ctx.channel());
185 Preconditions.checkState(pending.offer(this), "Cannot pend another request write (pending count: %s) on channel: %s",
186 pending.size(), ctx.channel());