BUG-1521 Netconf-netty-util missing unit tests 50/10950/2
authorMaros Marsalek <mmarsale@cisco.com>
Tue, 9 Sep 2014 13:54:48 +0000 (15:54 +0200)
committerMaros Marsalek <mmarsale@cisco.com>
Wed, 10 Sep 2014 07:07:44 +0000 (09:07 +0200)
Change-Id: I1cb19296d2c428214212f342fab64999ad3eb5f3
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/exi/EXIParameters.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/exi/NetconfStartExiMessage.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractChannelInitializerTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfEXIHandlersTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/exi/EXIParametersTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/exi/NetconfStartExiMessageTest.java [new file with mode: 0644]
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java [new file with mode: 0644]

index 993709258a3410b80a8a6fbdde47f0374ca15619..531ba3ccb725589d76725ebfc9ecdae7ecdd35bc 100644 (file)
@@ -7,18 +7,21 @@
  */
 package org.opendaylight.controller.netconf.nettyutil.handler.exi;
 
-import com.google.common.base.Preconditions;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.openexi.proc.common.AlignmentType;
 import org.openexi.proc.common.EXIOptions;
 import org.openexi.proc.common.EXIOptionsException;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import com.google.common.base.Preconditions;
 
 public final class EXIParameters {
     private static final String EXI_PARAMETER_ALIGNMENT = "alignment";
-    private static final String EXI_PARAMETER_BYTE_ALIGNED = "byte-aligned";
-    private static final String EXI_PARAMETER_BIT_PACKED = "bit-packed";
-    private static final String EXI_PARAMETER_COMPRESSED = "compressed";
-    private static final String EXI_PARAMETER_PRE_COMPRESSION = "pre-compression";
+    static final String EXI_PARAMETER_BYTE_ALIGNED = "byte-aligned";
+    static final String EXI_PARAMETER_BIT_PACKED = "bit-packed";
+    static final String EXI_PARAMETER_COMPRESSED = "compressed";
+    static final String EXI_PARAMETER_PRE_COMPRESSION = "pre-compression";
 
     private static final String EXI_PARAMETER_FIDELITY = "fidelity";
     private static final String EXI_FIDELITY_DTD = "dtd";
@@ -38,15 +41,25 @@ public final class EXIParameters {
         final EXIOptions options =  new EXIOptions();
 
         options.setAlignmentType(AlignmentType.bitPacked);
-        if (root.getElementsByTagName(EXI_PARAMETER_ALIGNMENT).getLength() > 0) {
-            if (root.getElementsByTagName(EXI_PARAMETER_BIT_PACKED).getLength() > 0) {
-                options.setAlignmentType(AlignmentType.bitPacked);
-            } else if (root.getElementsByTagName(EXI_PARAMETER_BYTE_ALIGNED).getLength() > 0) {
-                options.setAlignmentType(AlignmentType.byteAligned);
-            } else if (root.getElementsByTagName(EXI_PARAMETER_COMPRESSED).getLength() > 0) {
-                options.setAlignmentType(AlignmentType.compress);
-            } else if (root.getElementsByTagName(EXI_PARAMETER_PRE_COMPRESSION).getLength() > 0) {
-                options.setAlignmentType(AlignmentType.preCompress);
+
+        final NodeList alignmentElements = root.getElementsByTagName(EXI_PARAMETER_ALIGNMENT);
+        if (alignmentElements.getLength() > 0) {
+            final Element alignmentElement = (Element) alignmentElements.item(0);
+            final String alignmentTextContent = alignmentElement.getTextContent().trim();
+
+            switch (alignmentTextContent) {
+                case EXI_PARAMETER_BIT_PACKED:
+                    options.setAlignmentType(AlignmentType.bitPacked);
+                    break;
+                case EXI_PARAMETER_BYTE_ALIGNED:
+                    options.setAlignmentType(AlignmentType.byteAligned);
+                    break;
+                case EXI_PARAMETER_COMPRESSED:
+                    options.setAlignmentType(AlignmentType.compress);
+                    break;
+                case EXI_PARAMETER_PRE_COMPRESSION:
+                    options.setAlignmentType(AlignmentType.preCompress);
+                    break;
             }
         }
 
index 72eb774b5303efb14769d7fe1da644ea34456d82..1d301d3d35cc05f63d83c5643d400f3f643e02d7 100644 (file)
@@ -8,8 +8,8 @@
 
 package org.opendaylight.controller.netconf.nettyutil.handler.exi;
 
+import com.google.common.collect.Lists;
 import java.util.List;
-
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
@@ -17,8 +17,6 @@ import org.openexi.proc.common.EXIOptions;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
-import com.google.common.collect.Lists;
-
 /**
  * Start-exi netconf message.
  */
@@ -33,19 +31,19 @@ public final class NetconfStartExiMessage extends NetconfMessage {
     public static final String PIS_KEY = "pis";
     public static final String PREFIXES_KEY = "prefixes";
 
-    private NetconfStartExiMessage(Document doc) {
+    private NetconfStartExiMessage(final Document doc) {
         super(doc);
     }
 
-    public static NetconfStartExiMessage create(EXIOptions exiOptions, String messageId) {
-        Document doc = XmlUtil.newDocument();
-        Element rpcElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
+    public static NetconfStartExiMessage create(final EXIOptions exiOptions, final String messageId) {
+        final Document doc = XmlUtil.newDocument();
+        final Element rpcElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
                 XmlNetconfConstants.RPC_KEY);
         rpcElement.setAttributeNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
                 XmlNetconfConstants.MESSAGE_ID, messageId);
 
         // TODO draft http://tools.ietf.org/html/draft-varga-netconf-exi-capability-02#section-3.5.1 has no namespace for start-exi element in xml
-        Element startExiElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
+        final Element startExiElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
                 START_EXI);
 
         addAlignment(exiOptions, doc, startExiElement);
@@ -57,8 +55,8 @@ public final class NetconfStartExiMessage extends NetconfMessage {
         return new NetconfStartExiMessage(doc);
     }
 
-    private static void addFidelity(EXIOptions exiOptions, Document doc, Element startExiElement) {
-        List<Element> fidelityElements = Lists.newArrayList();
+    private static void addFidelity(final EXIOptions exiOptions, final Document doc, final Element startExiElement) {
+        final List<Element> fidelityElements = Lists.newArrayList();
         createFidelityElement(doc, fidelityElements, exiOptions.getPreserveComments(), COMMENTS_KEY);
         createFidelityElement(doc, fidelityElements, exiOptions.getPreserveDTD(), DTD_KEY);
         createFidelityElement(doc, fidelityElements, exiOptions.getPreserveLexicalValues(), LEXICAL_VALUES_KEY);
@@ -66,23 +64,44 @@ public final class NetconfStartExiMessage extends NetconfMessage {
         createFidelityElement(doc, fidelityElements, exiOptions.getPreserveNS(), PREFIXES_KEY);
 
         if (fidelityElements.isEmpty() == false) {
-            Element fidelityElement = doc.createElementNS(
+            final Element fidelityElement = doc.createElementNS(
                     XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0, FIDELITY_KEY);
-            for (Element element : fidelityElements) {
+            for (final Element element : fidelityElements) {
                 fidelityElement.appendChild(element);
             }
             startExiElement.appendChild(fidelityElement);
         }
     }
 
-    private static void addAlignment(EXIOptions exiOptions, Document doc, Element startExiElement) {
-        Element alignmentElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
+    private static void addAlignment(final EXIOptions exiOptions, final Document doc, final Element startExiElement) {
+        final Element alignmentElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
                 ALIGNMENT_KEY);
-        alignmentElement.setTextContent(exiOptions.getAlignmentType().toString());
+
+        String alignmentString = EXIParameters.EXI_PARAMETER_BIT_PACKED;
+        switch (exiOptions.getAlignmentType()) {
+        case byteAligned: {
+            alignmentString = EXIParameters.EXI_PARAMETER_BYTE_ALIGNED;
+            break;
+        }
+        case bitPacked: {
+            alignmentString = EXIParameters.EXI_PARAMETER_BIT_PACKED;
+            break;
+        }
+        case compress: {
+            alignmentString = EXIParameters.EXI_PARAMETER_COMPRESSED;
+            break;
+        }
+        case preCompress: {
+            alignmentString = EXIParameters.EXI_PARAMETER_PRE_COMPRESSION;
+            break;
+        }
+        }
+
+        alignmentElement.setTextContent(alignmentString);
         startExiElement.appendChild(alignmentElement);
     }
 
-    private static void createFidelityElement(Document doc, List<Element> fidelityElements, boolean fidelity, String fidelityName) {
+    private static void createFidelityElement(final Document doc, final List<Element> fidelityElements, final boolean fidelity, final String fidelityName) {
 
         if (fidelity) {
             fidelityElements.add(doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
index 0d877c9ec73797010013df229b9101d86445304f..369c013832790eef19dc2b751baa6a9564bb7800 100644 (file)
@@ -148,9 +148,11 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         connectPromise = null;
 
         sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
-        sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
-
-        ctx.fireChannelActive();
+        // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
+        if(channel != null) {
+            sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+            ctx.fireChannelActive();
+        }
     }
 
     private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
@@ -230,17 +232,14 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         @Override
         public synchronized void operationComplete(final IoReadFuture future) {
             if(future.getException() != null) {
-
                 if(asyncOut.isClosed() || asyncOut.isClosing()) {
-
                     // Ssh dropped
                     logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
-                    invokeDisconnect();
-                    return;
                 } else {
                     logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
-                    invokeDisconnect();
                 }
+                invokeDisconnect();
+                return;
             }
 
             if (future.getRead() > 0) {
@@ -324,6 +323,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
                 // Check limit for pending writes
                 pendingWriteCounter++;
                 if(pendingWriteCounter > MAX_PENDING_WRITES) {
+                    promise.setFailure(e);
                     handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
                             ", remote window is not getting read or is too small"));
                 }
@@ -331,6 +331,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
                 logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
 
                 // In case of pending, re-invoke write after pending is finished
+                Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e);
                 lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
                     @Override
                     public void operationComplete(final IoWriteFuture future) {
diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractChannelInitializerTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractChannelInitializerTest.java
new file mode 100644 (file)
index 0000000..83eafb5
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelPipeline;
+import io.netty.util.concurrent.Promise;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.api.NetconfSession;
+
+public class AbstractChannelInitializerTest {
+
+    @Mock
+    private Channel channel;
+    @Mock
+    private ChannelPipeline pipeline;
+    @Mock
+    private Promise<NetconfSession> sessionPromise;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        doReturn(pipeline).when(channel).pipeline();
+        doReturn(pipeline).when(pipeline).addLast(anyString(), any(ChannelHandler.class));
+    }
+
+    @Test
+    public void testInit() throws Exception {
+        final TestingInitializer testingInitializer = new TestingInitializer();
+        testingInitializer.initialize(channel, sessionPromise);
+        verify(pipeline, times(4)).addLast(anyString(), any(ChannelHandler.class));
+    }
+
+    private static final class TestingInitializer extends AbstractChannelInitializer<NetconfSession> {
+
+        @Override
+        protected void initializeSessionNegotiator(final Channel ch, final Promise<NetconfSession> promise) {
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionTest.java
new file mode 100644 (file)
index 0000000..8199963
--- /dev/null
@@ -0,0 +1,155 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import com.google.common.base.Optional;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelPipeline;
+import java.util.Collections;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfSession;
+import org.opendaylight.controller.netconf.api.NetconfSessionListener;
+import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
+import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
+import org.opendaylight.controller.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.openexi.proc.common.EXIOptions;
+
+public class AbstractNetconfSessionTest {
+
+    @Mock
+    private NetconfSessionListener<NetconfSession> listener;
+    @Mock
+    private Channel channel;
+    @Mock
+    private ChannelPipeline pipeline;
+    private NetconfHelloMessage clientHello;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        doNothing().when(listener).onMessage(any(NetconfSession.class), any(NetconfMessage.class));
+        doNothing().when(listener).onSessionUp(any(NetconfSession.class));
+        doNothing().when(listener).onSessionDown(any(NetconfSession.class), any(Exception.class));
+        doNothing().when(listener).onSessionTerminated(any(NetconfSession.class), any(NetconfTerminationReason.class));
+
+        doReturn(mock(ChannelFuture.class)).when(channel).writeAndFlush(any(NetconfMessage.class));
+        doReturn(pipeline).when(channel).pipeline();
+        doReturn(mock(ChannelFuture.class)).when(channel).close();
+
+        doReturn(null).when(pipeline).replace(anyString(), anyString(), any(ChannelHandler.class));
+
+        clientHello = NetconfHelloMessage.createClientHello(Collections.<String>emptySet(), Optional.<NetconfHelloMessageAdditionalHeader>absent());
+    }
+
+    @Test
+    public void testHandleMessage() throws Exception {
+        final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+        testingNetconfSession.handleMessage(clientHello);
+        verify(listener).onMessage(testingNetconfSession, clientHello);
+    }
+
+    @Test
+    public void testSessionUp() throws Exception {
+        final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+        testingNetconfSession.sessionUp();
+        verify(listener).onSessionUp(testingNetconfSession);
+        assertEquals(1L, testingNetconfSession.getSessionId());
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+        testingNetconfSession.sessionUp();
+        testingNetconfSession.close();
+        verify(channel).close();
+        verify(listener).onSessionTerminated(any(NetconfSession.class), any(NetconfTerminationReason.class));
+    }
+
+    @Test
+    public void testReplaceHandlers() throws Exception {
+        final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+        final ChannelHandler mock = mock(ChannelHandler.class);
+        doReturn("handler").when(mock).toString();
+
+        testingNetconfSession.replaceMessageDecoder(mock);
+        verify(pipeline).replace(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, mock);
+        testingNetconfSession.replaceMessageEncoder(mock);
+        verify(pipeline).replace(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, mock);
+        testingNetconfSession.replaceMessageEncoderAfterNextMessage(mock);
+        verifyNoMoreInteractions(pipeline);
+
+        testingNetconfSession.sendMessage(clientHello);
+        verify(pipeline, times(2)).replace(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, mock);
+    }
+
+    @Test
+    public void testStartExi() throws Exception {
+        TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+        testingNetconfSession = spy(testingNetconfSession);
+
+        testingNetconfSession.startExiCommunication(NetconfStartExiMessage.create(new EXIOptions(), "4"));
+        verify(testingNetconfSession).addExiHandlers(any(NetconfEXICodec.class));
+    }
+
+    @Test
+    public void testEndOfInput() throws Exception {
+        final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+        testingNetconfSession.endOfInput();
+        verifyZeroInteractions(listener);
+        testingNetconfSession.sessionUp();
+        testingNetconfSession.endOfInput();
+        verify(listener).onSessionDown(any(NetconfSession.class), any(Exception.class));
+    }
+
+    @Test
+    public void testSendMessage() throws Exception {
+        final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+        final NetconfHelloMessage clientHello = NetconfHelloMessage.createClientHello(Collections.<String>emptySet(), Optional.<NetconfHelloMessageAdditionalHeader>absent());
+        testingNetconfSession.sendMessage(clientHello);
+        verify(channel).writeAndFlush(clientHello);
+    }
+
+    private static class TestingNetconfSession extends AbstractNetconfSession<NetconfSession, NetconfSessionListener<NetconfSession>> {
+
+        protected TestingNetconfSession(final NetconfSessionListener<NetconfSession> sessionListener, final Channel channel, final long sessionId) {
+            super(sessionListener, channel, sessionId);
+        }
+
+        @Override
+        protected NetconfSession thisInstance() {
+            return this;
+        }
+
+        @Override
+        protected void addExiHandlers(final NetconfEXICodec exiCodec) {}
+
+        @Override
+        public void stopExiCommunication() {}
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfEXIHandlersTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfEXIHandlersTest.java
new file mode 100644 (file)
index 0000000..4a8db17
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import static org.junit.Assert.*;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.custommonkey.xmlunit.XMLUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.openexi.proc.common.EXIOptions;
+import org.openexi.proc.common.EXIOptionsException;
+import org.openexi.sax.Transmogrifier;
+import org.openexi.sax.TransmogrifierException;
+import org.xml.sax.InputSource;
+
+public class NetconfEXIHandlersTest {
+
+    private final String msgAsString = "<netconf-message/>";
+    private NetconfMessageToEXIEncoder netconfMessageToEXIEncoder;
+    private NetconfEXIToMessageDecoder netconfEXIToMessageDecoder;
+    private NetconfMessage msg;
+    private byte[] msgAsExi;
+
+    @Before
+    public void setUp() throws Exception {
+        final NetconfEXICodec codec = new NetconfEXICodec(new EXIOptions());
+        netconfMessageToEXIEncoder = new NetconfMessageToEXIEncoder(codec);
+        netconfEXIToMessageDecoder = new NetconfEXIToMessageDecoder(codec);
+
+        msg = new NetconfMessage(XmlUtil.readXmlToDocument(msgAsString));
+        this.msgAsExi = msgToExi(msgAsString, codec);
+    }
+
+    private byte[] msgToExi(final String msgAsString, final NetconfEXICodec codec) throws EXIOptionsException, TransmogrifierException, IOException {
+        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        final Transmogrifier transmogrifier = codec.getTransmogrifier();
+        transmogrifier.setOutputStream(byteArrayOutputStream);
+        transmogrifier.encode(new InputSource(new ByteArrayInputStream(msgAsString.getBytes())));
+        return byteArrayOutputStream.toByteArray();
+    }
+
+    @Test
+    public void testEncodeDecode() throws Exception {
+        final ByteBuf buffer = Unpooled.buffer();
+        netconfMessageToEXIEncoder.encode(null, msg, buffer);
+        final int exiLength = msgAsExi.length;
+        // array from buffer is cca 256 n length, compare only subarray
+        assertArrayEquals(msgAsExi, Arrays.copyOfRange(buffer.array(), 0, exiLength));
+
+        // assert all other bytes in buffer be 0
+        for (int i = exiLength; i < buffer.array().length; i++) {
+            assertEquals((byte)0, buffer.array()[i]);
+        }
+
+        final List<Object> out = Lists.newArrayList();
+        netconfEXIToMessageDecoder.decode(null, buffer, out);
+
+        XMLUnit.compareXML(msg.getDocument(), ((NetconfMessage) out.get(0)).getDocument());
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/exi/EXIParametersTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/exi/EXIParametersTest.java
new file mode 100644 (file)
index 0000000..15ba3b4
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.exi;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.openexi.proc.common.AlignmentType;
+import org.openexi.proc.common.EXIOptions;
+
+@RunWith(Parameterized.class)
+public class EXIParametersTest {
+
+    @Parameterized.Parameters
+    public static Iterable<Object[]> data() throws Exception {
+        final String noChangeXml =
+                "<start-exi xmlns=\"urn:ietf:params:xml:ns:netconf:exi:1.0\">\n" +
+                "<alignment>bit-packed</alignment>\n" +
+                "</start-exi>\n";
+
+
+        final String fullOptionsXml =
+                "<start-exi xmlns=\"urn:ietf:params:xml:ns:netconf:exi:1.0\">\n" +
+                "<alignment>byte-aligned</alignment>\n" +
+                "<fidelity>\n" +
+                "<comments/>\n" +
+                "<dtd/>\n" +
+                "<lexical-values/>\n" +
+                "<pis/>\n" +
+                "<prefixes/>\n" +
+                "</fidelity>\n" +
+                "</start-exi>\n";
+
+        final EXIOptions fullOptions = new EXIOptions();
+        fullOptions.setAlignmentType(AlignmentType.byteAligned);
+        fullOptions.setPreserveLexicalValues(true);
+        fullOptions.setPreserveDTD(true);
+        fullOptions.setPreserveComments(true);
+        fullOptions.setPreserveNS(true);
+        fullOptions.setPreservePIs(true);
+
+        return Arrays.asList(new Object[][]{
+                {noChangeXml, new EXIOptions()},
+                {fullOptionsXml, fullOptions},
+        });
+    }
+
+    private final String sourceXml;
+    private final EXIOptions exiOptions;
+
+    public EXIParametersTest(final String sourceXml, final EXIOptions exiOptions) {
+        this.sourceXml = sourceXml;
+        this.exiOptions = exiOptions;
+    }
+
+    @Test
+    public void testFromXmlElement() throws Exception {
+        final EXIParameters opts =
+                EXIParameters.fromXmlElement(
+                        XmlElement.fromDomElement(
+                                XmlUtil.readXmlToElement(sourceXml)));
+
+
+        assertEquals(opts.getOptions().getAlignmentType(), exiOptions.getAlignmentType());
+        assertEquals(opts.getOptions().getPreserveComments(), exiOptions.getPreserveComments());
+        assertEquals(opts.getOptions().getPreserveLexicalValues(), exiOptions.getPreserveLexicalValues());
+        assertEquals(opts.getOptions().getPreserveNS(), exiOptions.getPreserveNS());
+        assertEquals(opts.getOptions().getPreserveDTD(), exiOptions.getPreserveDTD());
+        assertEquals(opts.getOptions().getPreserveNS(), exiOptions.getPreserveNS());
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/exi/NetconfStartExiMessageTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/exi/NetconfStartExiMessageTest.java
new file mode 100644 (file)
index 0000000..47abe96
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.exi;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import org.custommonkey.xmlunit.Diff;
+import org.custommonkey.xmlunit.XMLUnit;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.openexi.proc.common.AlignmentType;
+import org.openexi.proc.common.EXIOptions;
+
+@RunWith(Parameterized.class)
+public class NetconfStartExiMessageTest {
+
+    @Parameterized.Parameters
+    public static Iterable<Object[]> data() throws Exception {
+        final String noChangeXml = "<rpc xmlns:ns0=\"urn:ietf:params:xml:ns:netconf:base:1.0\" ns0:message-id=\"id\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+                "<start-exi xmlns=\"urn:ietf:params:xml:ns:netconf:exi:1.0\">\n" +
+                "<alignment>bit-packed</alignment>\n" +
+                "</start-exi>\n" +
+                "</rpc>";
+
+
+        final String fullOptionsXml = "<rpc xmlns:ns0=\"urn:ietf:params:xml:ns:netconf:base:1.0\" ns0:message-id=\"id\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+                "<start-exi xmlns=\"urn:ietf:params:xml:ns:netconf:exi:1.0\">\n" +
+                "<alignment>byte-aligned</alignment>\n" +
+                "<fidelity>\n" +
+                "<comments/>\n" +
+                "<dtd/>\n" +
+                "<lexical-values/>\n" +
+                "<pis/>\n" +
+                "<prefixes/>\n" +
+                "</fidelity>\n" +
+                "</start-exi>\n" +
+                "</rpc>";
+
+        final EXIOptions fullOptions = new EXIOptions();
+        fullOptions.setAlignmentType(AlignmentType.byteAligned);
+        fullOptions.setPreserveLexicalValues(true);
+        fullOptions.setPreserveDTD(true);
+        fullOptions.setPreserveComments(true);
+        fullOptions.setPreserveNS(true);
+        fullOptions.setPreservePIs(true);
+
+        return Arrays.asList(new Object[][]{
+                {noChangeXml, new EXIOptions()},
+                {fullOptionsXml, fullOptions},
+        });
+    }
+
+    private final String controlXml;
+    private final EXIOptions exiOptions;
+
+    public NetconfStartExiMessageTest(final String controlXml, final EXIOptions exiOptions) {
+        this.controlXml = controlXml;
+        this.exiOptions = exiOptions;
+    }
+
+    @Test
+    public void testCreate() throws Exception {
+        final NetconfStartExiMessage startExiMessage = NetconfStartExiMessage.create(exiOptions, "id");
+
+        XMLUnit.setIgnoreWhitespace(true);
+        XMLUnit.setIgnoreAttributeOrder(true);
+        final Diff diff = XMLUnit.compareXML(XMLUnit.buildControlDocument(controlXml), startExiMessage.getDocument());
+        assertTrue(diff.toString(), diff.similar());
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java
new file mode 100644 (file)
index 0000000..223f2c7
--- /dev/null
@@ -0,0 +1,625 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import java.nio.channels.WritePendingException;
+import org.apache.sshd.ClientChannel;
+import org.apache.sshd.ClientSession;
+import org.apache.sshd.SshClient;
+import org.apache.sshd.client.channel.ChannelSubsystem;
+import org.apache.sshd.client.future.AuthFuture;
+import org.apache.sshd.client.future.ConnectFuture;
+import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.SshFuture;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoReadFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.Buffer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+public class AsyncSshHandlerTest {
+
+    @Mock
+    private SshClient sshClient;
+    @Mock
+    private AuthenticationHandler authHandler;
+    @Mock
+    private ChannelHandlerContext ctx;
+    @Mock
+    private Channel channel;
+    @Mock
+    private SocketAddress remoteAddress;
+    @Mock
+    private SocketAddress localAddress;
+
+    private AsyncSshHandler asyncSshHandler;
+
+    private SshFutureListener<ConnectFuture> sshConnectListener;
+    private SshFutureListener<AuthFuture> sshAuthListener;
+    private SshFutureListener<OpenFuture> sshChannelOpenListener;
+
+    private ChannelPromise promise;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        stubAuth();
+        stubSshClient();
+        stubChannel();
+        stubCtx();
+        stubRemoteAddress();
+
+        promise = getMockedPromise();
+
+        asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        sshConnectListener = null;
+        sshAuthListener = null;
+        sshChannelOpenListener = null;
+        promise = null;
+        asyncSshHandler.close(ctx, getMockedPromise());
+    }
+
+    private void stubAuth() throws IOException {
+        doReturn("usr").when(authHandler).getUsername();
+
+        final AuthFuture authFuture = mock(AuthFuture.class);
+        Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
+            @Override
+            public void onSuccess(final SshFutureListener<AuthFuture> result) {
+                sshAuthListener = result;
+            }
+        });
+        doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
+        final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable {
+                listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
+                return null;
+            }
+        }).when(future).addListener(any(SshFutureListener.class));
+
+        return listenerSettableFuture;
+    }
+
+    private void stubRemoteAddress() {
+        doReturn("remote").when(remoteAddress).toString();
+    }
+
+    private void stubCtx() {
+        doReturn(channel).when(ctx).channel();
+        doReturn(ctx).when(ctx).fireChannelActive();
+        doReturn(ctx).when(ctx).fireChannelInactive();
+        doReturn(ctx).when(ctx).fireChannelRead(anyObject());
+        doReturn(getMockedPromise()).when(ctx).newPromise();
+    }
+
+    private void stubChannel() {
+        doReturn("channel").when(channel).toString();
+    }
+
+    private void stubSshClient() {
+        doNothing().when(sshClient).start();
+        final ConnectFuture connectFuture = mock(ConnectFuture.class);
+        Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
+            @Override
+            public void onSuccess(final SshFutureListener<ConnectFuture> result) {
+                sshConnectListener = result;
+            }
+        });
+        doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
+    }
+
+    @Test
+    public void testConnectSuccess() throws Exception {
+        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+        final IoInputStream asyncOut = getMockedIoInputStream();
+        final IoOutputStream asyncIn = getMockedIoOutputStream();
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+        sshConnectListener.operationComplete(connectFuture);
+        sshAuthListener.operationComplete(getSuccessAuthFuture());
+        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+        verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
+
+        verify(promise).setSuccess();
+        verifyNoMoreInteractions(promise);
+        verify(ctx).fireChannelActive();
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+        final IoInputStream asyncOut = getMockedIoInputStream();
+        final IoOutputStream asyncIn = getMockedIoOutputStream();
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+        sshConnectListener.operationComplete(connectFuture);
+        sshAuthListener.operationComplete(getSuccessAuthFuture());
+        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+        verify(ctx).fireChannelRead(any(ByteBuf.class));
+    }
+
+    @Test
+    public void testReadClosed() throws Exception {
+        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+        final IoInputStream asyncOut = getMockedIoInputStream();
+        final IoReadFuture mockedReadFuture = asyncOut.read(null);
+
+        Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
+            @Override
+            public void onSuccess(final SshFutureListener<IoReadFuture> result) {
+                doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
+                doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
+                doReturn(true).when(asyncOut).isClosing();
+                doReturn(true).when(asyncOut).isClosed();
+                result.operationComplete(mockedReadFuture);
+            }
+        });
+
+        final IoOutputStream asyncIn = getMockedIoOutputStream();
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+        sshConnectListener.operationComplete(connectFuture);
+        sshAuthListener.operationComplete(getSuccessAuthFuture());
+        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+        verify(ctx).fireChannelInactive();
+    }
+
+    @Test
+    public void testReadFail() throws Exception {
+        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+        final IoInputStream asyncOut = getMockedIoInputStream();
+        final IoReadFuture mockedReadFuture = asyncOut.read(null);
+
+        Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
+            @Override
+            public void onSuccess(final SshFutureListener<IoReadFuture> result) {
+                doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
+                doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
+                result.operationComplete(mockedReadFuture);
+            }
+        });
+
+        final IoOutputStream asyncIn = getMockedIoOutputStream();
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+        sshConnectListener.operationComplete(connectFuture);
+        sshAuthListener.operationComplete(getSuccessAuthFuture());
+        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+        verify(ctx).fireChannelInactive();
+    }
+
+    @Test
+    public void testWrite() throws Exception {
+        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+        final IoInputStream asyncOut = getMockedIoInputStream();
+        final IoOutputStream asyncIn = getMockedIoOutputStream();
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+        sshConnectListener.operationComplete(connectFuture);
+        sshAuthListener.operationComplete(getSuccessAuthFuture());
+        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+        final ChannelPromise writePromise = getMockedPromise();
+        asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
+
+        verify(writePromise).setSuccess();
+    }
+
+    @Test
+    public void testWriteClosed() throws Exception {
+        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+        final IoInputStream asyncOut = getMockedIoInputStream();
+        final IoOutputStream asyncIn = getMockedIoOutputStream();
+
+        final IoWriteFuture ioWriteFuture = asyncIn.write(null);
+
+        Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
+            @Override
+            public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
+                doReturn(false).when(ioWriteFuture).isWritten();
+                doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
+                doReturn(true).when(asyncIn).isClosing();
+                doReturn(true).when(asyncIn).isClosed();
+                result.operationComplete(ioWriteFuture);
+            }
+        });
+
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+        sshConnectListener.operationComplete(connectFuture);
+        sshAuthListener.operationComplete(getSuccessAuthFuture());
+        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+        final ChannelPromise writePromise = getMockedPromise();
+        asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
+
+        verify(writePromise).setFailure(any(Throwable.class));
+    }
+
+    @Test
+    public void testWritePendingOne() throws Exception {
+        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+        final IoInputStream asyncOut = getMockedIoInputStream();
+        final IoOutputStream asyncIn = getMockedIoOutputStream();
+        final IoWriteFuture ioWriteFuture = asyncIn.write(null);
+
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+        sshConnectListener.operationComplete(connectFuture);
+        sshAuthListener.operationComplete(getSuccessAuthFuture());
+        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+        final ChannelPromise firstWritePromise = getMockedPromise();
+
+        // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
+        final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
+        asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
+        final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
+        // intercept second listener, this is the listener for pending write for the pending write to know when pending state ended
+        final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
+
+        final ChannelPromise secondWritePromise = getMockedPromise();
+        // now make write throw pending exception
+        doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
+        asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
+
+        doReturn(ioWriteFuture).when(asyncIn).write(any(Buffer.class));
+
+        verifyZeroInteractions(firstWritePromise, secondWritePromise);
+
+        // make first write stop pending
+        firstWriteListener.operationComplete(ioWriteFuture);
+        // intercept third listener, this is regular listener for second write to determine success or failure
+        final ListenableFuture<SshFutureListener<IoWriteFuture>> afterPendingListener = stubAddListener(ioWriteFuture);
+
+        // notify listener for second write that pending has ended
+        pendingListener.get().operationComplete(ioWriteFuture);
+        // Notify third listener (regular listener for second write) that second write succeeded
+        afterPendingListener.get().operationComplete(ioWriteFuture);
+
+        // verify both write promises successful
+        verify(firstWritePromise).setSuccess();
+        verify(secondWritePromise).setSuccess();
+    }
+
+    @Test
+    public void testWritePendingMax() throws Exception {
+        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+        final IoInputStream asyncOut = getMockedIoInputStream();
+        final IoOutputStream asyncIn = getMockedIoOutputStream();
+        final IoWriteFuture ioWriteFuture = asyncIn.write(null);
+
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+        sshConnectListener.operationComplete(connectFuture);
+        sshAuthListener.operationComplete(getSuccessAuthFuture());
+        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+        final ChannelPromise firstWritePromise = getMockedPromise();
+
+        // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
+        final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
+        asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
+
+        final ChannelPromise secondWritePromise = getMockedPromise();
+        // now make write throw pending exception
+        doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
+        for (int i = 0; i < 1000; i++) {
+            asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
+        }
+
+        verify(ctx).fireChannelInactive();
+    }
+
+    @Test
+    public void testDisconnect() throws Exception {
+        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+        final IoInputStream asyncOut = getMockedIoInputStream();
+        final IoOutputStream asyncIn = getMockedIoOutputStream();
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+        sshConnectListener.operationComplete(connectFuture);
+        sshAuthListener.operationComplete(getSuccessAuthFuture());
+        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+        final ChannelPromise disconnectPromise = getMockedPromise();
+        asyncSshHandler.disconnect(ctx, disconnectPromise);
+
+        verify(sshSession).close(anyBoolean());
+        verify(disconnectPromise).setSuccess();
+        verify(ctx).fireChannelInactive();
+    }
+
+    private OpenFuture getSuccessOpenFuture() {
+        final OpenFuture failedOpenFuture = mock(OpenFuture.class);
+        doReturn(true).when(failedOpenFuture).isOpened();
+        return failedOpenFuture;
+    }
+
+    private AuthFuture getSuccessAuthFuture() {
+        final AuthFuture authFuture = mock(AuthFuture.class);
+        doReturn(true).when(authFuture).isSuccess();
+        return authFuture;
+    }
+
+    private ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
+        final ConnectFuture connectFuture = mock(ConnectFuture.class);
+        doReturn(true).when(connectFuture).isConnected();
+
+        doReturn(sshSession).when(connectFuture).getSession();
+        return connectFuture;
+    }
+
+    private ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
+        final ClientSession sshSession = mock(ClientSession.class);
+
+        doReturn("sshSession").when(sshSession).toString();
+        doReturn("serverVersion").when(sshSession).getServerVersion();
+        doReturn(false).when(sshSession).isClosed();
+        doReturn(false).when(sshSession).isClosing();
+        final CloseFuture closeFuture = mock(CloseFuture.class);
+        Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
+            @Override
+            public void onSuccess(final SshFutureListener<CloseFuture> result) {
+                doReturn(true).when(closeFuture).isClosed();
+                result.operationComplete(closeFuture);
+            }
+        });
+        doReturn(closeFuture).when(sshSession).close(false);
+
+        doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
+
+        return sshSession;
+    }
+
+    private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut, final IoOutputStream asyncIn) throws IOException {
+        final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
+        doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
+        final OpenFuture openFuture = mock(OpenFuture.class);
+
+        Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
+            @Override
+            public void onSuccess(final SshFutureListener<OpenFuture> result) {
+                sshChannelOpenListener = result;
+            }
+        });
+
+        doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
+
+        doReturn(openFuture).when(subsystemChannel).open();
+        doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
+        return subsystemChannel;
+    }
+
+    private IoOutputStream getMockedIoOutputStream() {
+        final IoOutputStream mock = mock(IoOutputStream.class);
+        final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
+        doReturn(ioWriteFuture).when(ioWriteFuture).addListener(Matchers.<SshFutureListener<IoWriteFuture>>any());
+        doReturn(true).when(ioWriteFuture).isWritten();
+
+        Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
+            @Override
+            public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
+                result.operationComplete(ioWriteFuture);
+            }
+        });
+
+        doReturn(ioWriteFuture).when(mock).write(any(Buffer.class));
+        doReturn(false).when(mock).isClosed();
+        doReturn(false).when(mock).isClosing();
+        return mock;
+    }
+
+    private IoInputStream getMockedIoInputStream() {
+        final IoInputStream mock = mock(IoInputStream.class);
+        final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
+        doReturn(null).when(ioReadFuture).getException();
+        doReturn(ioReadFuture).when(ioReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
+        doReturn(5).when(ioReadFuture).getRead();
+        doReturn(new Buffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
+        doReturn(ioReadFuture).when(ioReadFuture).addListener(Matchers.<SshFutureListener<IoReadFuture>>any());
+
+        // Always success for read
+        Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
+            @Override
+            public void onSuccess(final SshFutureListener<IoReadFuture> result) {
+                result.operationComplete(ioReadFuture);
+            }
+        });
+
+        doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
+        doReturn(false).when(mock).isClosed();
+        doReturn(false).when(mock).isClosing();
+        return mock;
+    }
+
+    @Test
+    public void testConnectFailOpenChannel() throws Exception {
+        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+        final IoInputStream asyncOut = getMockedIoInputStream();
+        final IoOutputStream asyncIn = getMockedIoOutputStream();
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+        sshConnectListener.operationComplete(connectFuture);
+
+        sshAuthListener.operationComplete(getSuccessAuthFuture());
+
+        verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
+
+        try {
+            sshChannelOpenListener.operationComplete(getFailedOpenFuture());
+            fail("Exception expected");
+        } catch (final Exception e) {
+            verify(promise).setFailure(any(Throwable.class));
+            verifyNoMoreInteractions(promise);
+            // TODO should ctx.channelInactive be called if we throw exception ?
+        }
+    }
+
+    @Test
+    public void testConnectFailAuth() throws Exception {
+        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+        final ClientSession sshSession = mock(ClientSession.class);
+        doReturn(true).when(sshSession).isClosed();
+        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+        sshConnectListener.operationComplete(connectFuture);
+
+        final AuthFuture authFuture = getFailedAuthFuture();
+
+        try {
+            sshAuthListener.operationComplete(authFuture);
+            fail("Exception expected");
+        } catch (final Exception e) {
+            verify(promise).setFailure(any(Throwable.class));
+            verifyNoMoreInteractions(promise);
+            // TODO should ctx.channelInactive be called ?
+        }
+    }
+
+    private AuthFuture getFailedAuthFuture() {
+        final AuthFuture authFuture = mock(AuthFuture.class);
+        doReturn(false).when(authFuture).isSuccess();
+        doReturn(new IllegalStateException()).when(authFuture).getException();
+        return authFuture;
+    }
+
+    private OpenFuture getFailedOpenFuture() {
+        final OpenFuture authFuture = mock(OpenFuture.class);
+        doReturn(false).when(authFuture).isOpened();
+        doReturn(new IllegalStateException()).when(authFuture).getException();
+        return authFuture;
+    }
+
+    @Test
+    public void testConnectFail() throws Exception {
+        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+        final ConnectFuture connectFuture = getFailedConnectFuture();
+        try {
+            sshConnectListener.operationComplete(connectFuture);
+            fail("Exception expected");
+        } catch (final Exception e) {
+            verify(promise).setFailure(any(Throwable.class));
+            verifyNoMoreInteractions(promise);
+            // TODO should ctx.channelInactive be called ?
+        }
+    }
+
+    private ConnectFuture getFailedConnectFuture() {
+        final ConnectFuture connectFuture = mock(ConnectFuture.class);
+        doReturn(false).when(connectFuture).isConnected();
+        doReturn(new IllegalStateException()).when(connectFuture).getException();
+        return connectFuture;
+    }
+
+    private ChannelPromise getMockedPromise() {
+        final ChannelPromise promise = mock(ChannelPromise.class);
+        doReturn(promise).when(promise).setSuccess();
+        doReturn(promise).when(promise).setFailure(any(Throwable.class));
+        return promise;
+    }
+
+    private static abstract class SuccessFutureListener<T extends SshFuture<T>> implements FutureCallback<SshFutureListener<T>> {
+
+        @Override
+        public abstract void onSuccess(final SshFutureListener<T> result);
+
+        @Override
+        public void onFailure(final Throwable t) {
+            throw new RuntimeException(t);
+        }
+    }
+}