fix Honeynode issues with fluorine
[transportpce.git] / tests / honeynode / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
diff --git a/tests/honeynode/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java b/tests/honeynode/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java
new file mode 100644 (file)
index 0000000..d116b99
--- /dev/null
@@ -0,0 +1,210 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.nettyutil.handler.ssh.client;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.nio.charset.StandardCharsets;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Queue;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.WritePendingException;
+import org.apache.sshd.common.util.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
+ * Also handles pending writes by caching requests until pending state is over.
+ */
+public final class AsyncSshHandlerWriter implements AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(AsyncSshHandlerWriter.class);
+
+    // public static final int MAX_PENDING_WRITES = 1000;
+    // TODO implement Limiting mechanism for pending writes
+    // But there is a possible issue with limiting:
+    // 1. What to do when queue is full ? Immediate Fail for every request ?
+    // 2. At this level we might be dealing with Chunks of messages(not whole messages)
+    // and unexpected behavior might occur when we send/queue 1 chunk and fail the other chunks
+
+    private final Object asyncInLock = new Object();
+    private volatile IoOutputStream asyncIn;
+
+    // Order has to be preserved for queued writes
+    private final Deque<PendingWriteRequest> pending = new LinkedList<>();
+
+    public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
+        this.asyncIn = asyncIn;
+    }
+
+    public void write(final ChannelHandlerContext ctx,
+            final Object msg, final ChannelPromise promise) {
+        if (asyncIn == null) {
+            promise.setFailure(new IllegalStateException("Channel closed"));
+            return;
+        }
+        // synchronized block due to deadlock that happens on ssh window resize
+        // writes and pending writes would lock the underlyinch channel session
+        // window resize write would try to write the message on an already locked channelSession
+        // while the pending write was in progress from the write callback
+        synchronized (asyncInLock) {
+            // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
+            // If we are closed/closing, set immediate fail
+            if (asyncIn.isClosed() || asyncIn.isClosing()) {
+                promise.setFailure(new IllegalStateException("Channel closed"));
+            } else {
+                final ByteBuf byteBufMsg = (ByteBuf) msg;
+                if (pending.isEmpty() == false) {
+                    queueRequest(ctx, byteBufMsg, promise);
+                    return;
+                }
+
+                writeWithPendingDetection(ctx, promise, byteBufMsg, false);
+            }
+        }
+    }
+
+    //sending message with pending
+    //if resending message not succesfull, then attribute wasPending is true
+    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise,
+                                           final ByteBuf byteBufMsg, final boolean wasPending) {
+        try {
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
+            }
+            asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
+
+                @Override
+                public void operationComplete(final IoWriteFuture future) {
+                    // synchronized block due to deadlock that happens on ssh window resize
+                    // writes and pending writes would lock the underlyinch channel session
+                    // window resize write would try to write the message on an already locked channelSession,
+                    // while the pending write was in progress from the write callback
+                    synchronized (asyncInLock) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace(
+                                "Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
+                                ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
+                        }
+
+                        // Notify success or failure
+                        if (future.isWritten()) {
+                            promise.setSuccess();
+                        } else {
+                            LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(),
+                                    byteBufToString(byteBufMsg), future.getException());
+                            promise.setFailure(future.getException());
+                        }
+
+                        // Not needed anymore, release
+                        byteBufMsg.release();
+
+                        //rescheduling message from queue after successfully sent
+                        if (wasPending) {
+                            byteBufMsg.resetReaderIndex();
+                            pending.remove();
+                        }
+                    }
+
+                    // Check pending queue and schedule next
+                    // At this time we are guaranteed that we are not in pending state anymore
+                    // so the next request should succeed
+                    writePendingIfAny();
+                }
+            });
+
+        } catch (final WritePendingException e) {
+
+            if (wasPending == false) {
+                queueRequest(ctx, byteBufMsg, promise);
+            }
+        }
+    }
+
+    private void writePendingIfAny() {
+        synchronized (asyncInLock) {
+            if (pending.peek() == null) {
+                return;
+            }
+
+            final PendingWriteRequest pendingWrite = pending.peek();
+            final ByteBuf msg = pendingWrite.msg;
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Writing pending request on channel: {}, message: {}",
+                        pendingWrite.ctx.channel(), byteBufToString(msg));
+            }
+
+            writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
+        }
+    }
+
+    public static String byteBufToString(final ByteBuf msg) {
+        final String s = msg.toString(StandardCharsets.UTF_8);
+        msg.resetReaderIndex();
+        return s;
+    }
+
+    private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
+//        try {
+        LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
+        }
+        new PendingWriteRequest(ctx, msg, promise).pend(pending);
+//        } catch (final Exception ex) {
+//            LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}",
+//                    ctx.channel(), ex, byteBufToString(msg));
+//            msg.release();
+//            promise.setFailure(ex);
+//        }
+    }
+
+    @Override
+    public void close() {
+        asyncIn = null;
+    }
+
+    private static Buffer toBuffer(final ByteBuf msg) {
+        // TODO Buffer vs ByteBuf translate, Can we handle that better ?
+        msg.resetReaderIndex();
+        final byte[] temp = new byte[msg.readableBytes()];
+        msg.readBytes(temp, 0, msg.readableBytes());
+        return new Buffer(temp);
+    }
+
+    private static final class PendingWriteRequest {
+        private final ChannelHandlerContext ctx;
+        private final ByteBuf msg;
+        private final ChannelPromise promise;
+
+        PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
+            this.ctx = ctx;
+            // Reset reader index, last write (failed) attempt moved index to the end
+            msg.resetReaderIndex();
+            this.msg = msg;
+            this.promise = promise;
+        }
+
+        public void pend(final Queue<PendingWriteRequest> pending) {
+            // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
+            // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
+            // pending.size(), ctx.channel());
+            Preconditions.checkState(pending.offer(this),
+                "Cannot pend another request write (pending count: %s) on channel: %s", pending.size(), ctx.channel());
+        }
+    }
+}