*/
package org.opendaylight.controller.netconf.nettyutil.handler.exi;
-import com.google.common.base.Preconditions;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.openexi.proc.common.AlignmentType;
import org.openexi.proc.common.EXIOptions;
import org.openexi.proc.common.EXIOptionsException;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import com.google.common.base.Preconditions;
public final class EXIParameters {
private static final String EXI_PARAMETER_ALIGNMENT = "alignment";
- private static final String EXI_PARAMETER_BYTE_ALIGNED = "byte-aligned";
- private static final String EXI_PARAMETER_BIT_PACKED = "bit-packed";
- private static final String EXI_PARAMETER_COMPRESSED = "compressed";
- private static final String EXI_PARAMETER_PRE_COMPRESSION = "pre-compression";
+ static final String EXI_PARAMETER_BYTE_ALIGNED = "byte-aligned";
+ static final String EXI_PARAMETER_BIT_PACKED = "bit-packed";
+ static final String EXI_PARAMETER_COMPRESSED = "compressed";
+ static final String EXI_PARAMETER_PRE_COMPRESSION = "pre-compression";
private static final String EXI_PARAMETER_FIDELITY = "fidelity";
private static final String EXI_FIDELITY_DTD = "dtd";
final EXIOptions options = new EXIOptions();
options.setAlignmentType(AlignmentType.bitPacked);
- if (root.getElementsByTagName(EXI_PARAMETER_ALIGNMENT).getLength() > 0) {
- if (root.getElementsByTagName(EXI_PARAMETER_BIT_PACKED).getLength() > 0) {
- options.setAlignmentType(AlignmentType.bitPacked);
- } else if (root.getElementsByTagName(EXI_PARAMETER_BYTE_ALIGNED).getLength() > 0) {
- options.setAlignmentType(AlignmentType.byteAligned);
- } else if (root.getElementsByTagName(EXI_PARAMETER_COMPRESSED).getLength() > 0) {
- options.setAlignmentType(AlignmentType.compress);
- } else if (root.getElementsByTagName(EXI_PARAMETER_PRE_COMPRESSION).getLength() > 0) {
- options.setAlignmentType(AlignmentType.preCompress);
+
+ final NodeList alignmentElements = root.getElementsByTagName(EXI_PARAMETER_ALIGNMENT);
+ if (alignmentElements.getLength() > 0) {
+ final Element alignmentElement = (Element) alignmentElements.item(0);
+ final String alignmentTextContent = alignmentElement.getTextContent().trim();
+
+ switch (alignmentTextContent) {
+ case EXI_PARAMETER_BIT_PACKED:
+ options.setAlignmentType(AlignmentType.bitPacked);
+ break;
+ case EXI_PARAMETER_BYTE_ALIGNED:
+ options.setAlignmentType(AlignmentType.byteAligned);
+ break;
+ case EXI_PARAMETER_COMPRESSED:
+ options.setAlignmentType(AlignmentType.compress);
+ break;
+ case EXI_PARAMETER_PRE_COMPRESSION:
+ options.setAlignmentType(AlignmentType.preCompress);
+ break;
}
}
package org.opendaylight.controller.netconf.nettyutil.handler.exi;
+import com.google.common.collect.Lists;
import java.util.List;
-
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import com.google.common.collect.Lists;
-
/**
* Start-exi netconf message.
*/
public static final String PIS_KEY = "pis";
public static final String PREFIXES_KEY = "prefixes";
- private NetconfStartExiMessage(Document doc) {
+ private NetconfStartExiMessage(final Document doc) {
super(doc);
}
- public static NetconfStartExiMessage create(EXIOptions exiOptions, String messageId) {
- Document doc = XmlUtil.newDocument();
- Element rpcElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
+ public static NetconfStartExiMessage create(final EXIOptions exiOptions, final String messageId) {
+ final Document doc = XmlUtil.newDocument();
+ final Element rpcElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
XmlNetconfConstants.RPC_KEY);
rpcElement.setAttributeNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
XmlNetconfConstants.MESSAGE_ID, messageId);
// TODO draft http://tools.ietf.org/html/draft-varga-netconf-exi-capability-02#section-3.5.1 has no namespace for start-exi element in xml
- Element startExiElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
+ final Element startExiElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
START_EXI);
addAlignment(exiOptions, doc, startExiElement);
return new NetconfStartExiMessage(doc);
}
- private static void addFidelity(EXIOptions exiOptions, Document doc, Element startExiElement) {
- List<Element> fidelityElements = Lists.newArrayList();
+ private static void addFidelity(final EXIOptions exiOptions, final Document doc, final Element startExiElement) {
+ final List<Element> fidelityElements = Lists.newArrayList();
createFidelityElement(doc, fidelityElements, exiOptions.getPreserveComments(), COMMENTS_KEY);
createFidelityElement(doc, fidelityElements, exiOptions.getPreserveDTD(), DTD_KEY);
createFidelityElement(doc, fidelityElements, exiOptions.getPreserveLexicalValues(), LEXICAL_VALUES_KEY);
createFidelityElement(doc, fidelityElements, exiOptions.getPreserveNS(), PREFIXES_KEY);
if (fidelityElements.isEmpty() == false) {
- Element fidelityElement = doc.createElementNS(
+ final Element fidelityElement = doc.createElementNS(
XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0, FIDELITY_KEY);
- for (Element element : fidelityElements) {
+ for (final Element element : fidelityElements) {
fidelityElement.appendChild(element);
}
startExiElement.appendChild(fidelityElement);
}
}
- private static void addAlignment(EXIOptions exiOptions, Document doc, Element startExiElement) {
- Element alignmentElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
+ private static void addAlignment(final EXIOptions exiOptions, final Document doc, final Element startExiElement) {
+ final Element alignmentElement = doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
ALIGNMENT_KEY);
- alignmentElement.setTextContent(exiOptions.getAlignmentType().toString());
+
+ String alignmentString = EXIParameters.EXI_PARAMETER_BIT_PACKED;
+ switch (exiOptions.getAlignmentType()) {
+ case byteAligned: {
+ alignmentString = EXIParameters.EXI_PARAMETER_BYTE_ALIGNED;
+ break;
+ }
+ case bitPacked: {
+ alignmentString = EXIParameters.EXI_PARAMETER_BIT_PACKED;
+ break;
+ }
+ case compress: {
+ alignmentString = EXIParameters.EXI_PARAMETER_COMPRESSED;
+ break;
+ }
+ case preCompress: {
+ alignmentString = EXIParameters.EXI_PARAMETER_PRE_COMPRESSION;
+ break;
+ }
+ }
+
+ alignmentElement.setTextContent(alignmentString);
startExiElement.appendChild(alignmentElement);
}
- private static void createFidelityElement(Document doc, List<Element> fidelityElements, boolean fidelity, String fidelityName) {
+ private static void createFidelityElement(final Document doc, final List<Element> fidelityElements, final boolean fidelity, final String fidelityName) {
if (fidelity) {
fidelityElements.add(doc.createElementNS(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0,
connectPromise = null;
sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
- sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
-
- ctx.fireChannelActive();
+ // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
+ if(channel != null) {
+ sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+ ctx.fireChannelActive();
+ }
}
private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
@Override
public synchronized void operationComplete(final IoReadFuture future) {
if(future.getException() != null) {
-
if(asyncOut.isClosed() || asyncOut.isClosing()) {
-
// Ssh dropped
logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
- invokeDisconnect();
- return;
} else {
logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
- invokeDisconnect();
}
+ invokeDisconnect();
+ return;
}
if (future.getRead() > 0) {
// Check limit for pending writes
pendingWriteCounter++;
if(pendingWriteCounter > MAX_PENDING_WRITES) {
+ promise.setFailure(e);
handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
", remote window is not getting read or is too small"));
}
logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
// In case of pending, re-invoke write after pending is finished
+ Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e);
lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(final IoWriteFuture future) {
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelPipeline;
+import io.netty.util.concurrent.Promise;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.api.NetconfSession;
+
+public class AbstractChannelInitializerTest {
+
+ @Mock
+ private Channel channel;
+ @Mock
+ private ChannelPipeline pipeline;
+ @Mock
+ private Promise<NetconfSession> sessionPromise;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ doReturn(pipeline).when(channel).pipeline();
+ doReturn(pipeline).when(pipeline).addLast(anyString(), any(ChannelHandler.class));
+ }
+
+ @Test
+ public void testInit() throws Exception {
+ final TestingInitializer testingInitializer = new TestingInitializer();
+ testingInitializer.initialize(channel, sessionPromise);
+ verify(pipeline, times(4)).addLast(anyString(), any(ChannelHandler.class));
+ }
+
+ private static final class TestingInitializer extends AbstractChannelInitializer<NetconfSession> {
+
+ @Override
+ protected void initializeSessionNegotiator(final Channel ch, final Promise<NetconfSession> promise) {
+ }
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import com.google.common.base.Optional;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelPipeline;
+import java.util.Collections;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfSession;
+import org.opendaylight.controller.netconf.api.NetconfSessionListener;
+import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
+import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
+import org.opendaylight.controller.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.openexi.proc.common.EXIOptions;
+
+public class AbstractNetconfSessionTest {
+
+ @Mock
+ private NetconfSessionListener<NetconfSession> listener;
+ @Mock
+ private Channel channel;
+ @Mock
+ private ChannelPipeline pipeline;
+ private NetconfHelloMessage clientHello;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ doNothing().when(listener).onMessage(any(NetconfSession.class), any(NetconfMessage.class));
+ doNothing().when(listener).onSessionUp(any(NetconfSession.class));
+ doNothing().when(listener).onSessionDown(any(NetconfSession.class), any(Exception.class));
+ doNothing().when(listener).onSessionTerminated(any(NetconfSession.class), any(NetconfTerminationReason.class));
+
+ doReturn(mock(ChannelFuture.class)).when(channel).writeAndFlush(any(NetconfMessage.class));
+ doReturn(pipeline).when(channel).pipeline();
+ doReturn(mock(ChannelFuture.class)).when(channel).close();
+
+ doReturn(null).when(pipeline).replace(anyString(), anyString(), any(ChannelHandler.class));
+
+ clientHello = NetconfHelloMessage.createClientHello(Collections.<String>emptySet(), Optional.<NetconfHelloMessageAdditionalHeader>absent());
+ }
+
+ @Test
+ public void testHandleMessage() throws Exception {
+ final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+ testingNetconfSession.handleMessage(clientHello);
+ verify(listener).onMessage(testingNetconfSession, clientHello);
+ }
+
+ @Test
+ public void testSessionUp() throws Exception {
+ final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+ testingNetconfSession.sessionUp();
+ verify(listener).onSessionUp(testingNetconfSession);
+ assertEquals(1L, testingNetconfSession.getSessionId());
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+ testingNetconfSession.sessionUp();
+ testingNetconfSession.close();
+ verify(channel).close();
+ verify(listener).onSessionTerminated(any(NetconfSession.class), any(NetconfTerminationReason.class));
+ }
+
+ @Test
+ public void testReplaceHandlers() throws Exception {
+ final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+ final ChannelHandler mock = mock(ChannelHandler.class);
+ doReturn("handler").when(mock).toString();
+
+ testingNetconfSession.replaceMessageDecoder(mock);
+ verify(pipeline).replace(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, mock);
+ testingNetconfSession.replaceMessageEncoder(mock);
+ verify(pipeline).replace(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, mock);
+ testingNetconfSession.replaceMessageEncoderAfterNextMessage(mock);
+ verifyNoMoreInteractions(pipeline);
+
+ testingNetconfSession.sendMessage(clientHello);
+ verify(pipeline, times(2)).replace(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, mock);
+ }
+
+ @Test
+ public void testStartExi() throws Exception {
+ TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+ testingNetconfSession = spy(testingNetconfSession);
+
+ testingNetconfSession.startExiCommunication(NetconfStartExiMessage.create(new EXIOptions(), "4"));
+ verify(testingNetconfSession).addExiHandlers(any(NetconfEXICodec.class));
+ }
+
+ @Test
+ public void testEndOfInput() throws Exception {
+ final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+ testingNetconfSession.endOfInput();
+ verifyZeroInteractions(listener);
+ testingNetconfSession.sessionUp();
+ testingNetconfSession.endOfInput();
+ verify(listener).onSessionDown(any(NetconfSession.class), any(Exception.class));
+ }
+
+ @Test
+ public void testSendMessage() throws Exception {
+ final TestingNetconfSession testingNetconfSession = new TestingNetconfSession(listener, channel, 1L);
+ final NetconfHelloMessage clientHello = NetconfHelloMessage.createClientHello(Collections.<String>emptySet(), Optional.<NetconfHelloMessageAdditionalHeader>absent());
+ testingNetconfSession.sendMessage(clientHello);
+ verify(channel).writeAndFlush(clientHello);
+ }
+
+ private static class TestingNetconfSession extends AbstractNetconfSession<NetconfSession, NetconfSessionListener<NetconfSession>> {
+
+ protected TestingNetconfSession(final NetconfSessionListener<NetconfSession> sessionListener, final Channel channel, final long sessionId) {
+ super(sessionListener, channel, sessionId);
+ }
+
+ @Override
+ protected NetconfSession thisInstance() {
+ return this;
+ }
+
+ @Override
+ protected void addExiHandlers(final NetconfEXICodec exiCodec) {}
+
+ @Override
+ public void stopExiCommunication() {}
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import static org.junit.Assert.*;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.custommonkey.xmlunit.XMLUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.openexi.proc.common.EXIOptions;
+import org.openexi.proc.common.EXIOptionsException;
+import org.openexi.sax.Transmogrifier;
+import org.openexi.sax.TransmogrifierException;
+import org.xml.sax.InputSource;
+
+public class NetconfEXIHandlersTest {
+
+ private final String msgAsString = "<netconf-message/>";
+ private NetconfMessageToEXIEncoder netconfMessageToEXIEncoder;
+ private NetconfEXIToMessageDecoder netconfEXIToMessageDecoder;
+ private NetconfMessage msg;
+ private byte[] msgAsExi;
+
+ @Before
+ public void setUp() throws Exception {
+ final NetconfEXICodec codec = new NetconfEXICodec(new EXIOptions());
+ netconfMessageToEXIEncoder = new NetconfMessageToEXIEncoder(codec);
+ netconfEXIToMessageDecoder = new NetconfEXIToMessageDecoder(codec);
+
+ msg = new NetconfMessage(XmlUtil.readXmlToDocument(msgAsString));
+ this.msgAsExi = msgToExi(msgAsString, codec);
+ }
+
+ private byte[] msgToExi(final String msgAsString, final NetconfEXICodec codec) throws EXIOptionsException, TransmogrifierException, IOException {
+ final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ final Transmogrifier transmogrifier = codec.getTransmogrifier();
+ transmogrifier.setOutputStream(byteArrayOutputStream);
+ transmogrifier.encode(new InputSource(new ByteArrayInputStream(msgAsString.getBytes())));
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ @Test
+ public void testEncodeDecode() throws Exception {
+ final ByteBuf buffer = Unpooled.buffer();
+ netconfMessageToEXIEncoder.encode(null, msg, buffer);
+ final int exiLength = msgAsExi.length;
+ // array from buffer is cca 256 n length, compare only subarray
+ assertArrayEquals(msgAsExi, Arrays.copyOfRange(buffer.array(), 0, exiLength));
+
+ // assert all other bytes in buffer be 0
+ for (int i = exiLength; i < buffer.array().length; i++) {
+ assertEquals((byte)0, buffer.array()[i]);
+ }
+
+ final List<Object> out = Lists.newArrayList();
+ netconfEXIToMessageDecoder.decode(null, buffer, out);
+
+ XMLUnit.compareXML(msg.getDocument(), ((NetconfMessage) out.get(0)).getDocument());
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.exi;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.opendaylight.controller.netconf.util.xml.XmlElement;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.openexi.proc.common.AlignmentType;
+import org.openexi.proc.common.EXIOptions;
+
+@RunWith(Parameterized.class)
+public class EXIParametersTest {
+
+ @Parameterized.Parameters
+ public static Iterable<Object[]> data() throws Exception {
+ final String noChangeXml =
+ "<start-exi xmlns=\"urn:ietf:params:xml:ns:netconf:exi:1.0\">\n" +
+ "<alignment>bit-packed</alignment>\n" +
+ "</start-exi>\n";
+
+
+ final String fullOptionsXml =
+ "<start-exi xmlns=\"urn:ietf:params:xml:ns:netconf:exi:1.0\">\n" +
+ "<alignment>byte-aligned</alignment>\n" +
+ "<fidelity>\n" +
+ "<comments/>\n" +
+ "<dtd/>\n" +
+ "<lexical-values/>\n" +
+ "<pis/>\n" +
+ "<prefixes/>\n" +
+ "</fidelity>\n" +
+ "</start-exi>\n";
+
+ final EXIOptions fullOptions = new EXIOptions();
+ fullOptions.setAlignmentType(AlignmentType.byteAligned);
+ fullOptions.setPreserveLexicalValues(true);
+ fullOptions.setPreserveDTD(true);
+ fullOptions.setPreserveComments(true);
+ fullOptions.setPreserveNS(true);
+ fullOptions.setPreservePIs(true);
+
+ return Arrays.asList(new Object[][]{
+ {noChangeXml, new EXIOptions()},
+ {fullOptionsXml, fullOptions},
+ });
+ }
+
+ private final String sourceXml;
+ private final EXIOptions exiOptions;
+
+ public EXIParametersTest(final String sourceXml, final EXIOptions exiOptions) {
+ this.sourceXml = sourceXml;
+ this.exiOptions = exiOptions;
+ }
+
+ @Test
+ public void testFromXmlElement() throws Exception {
+ final EXIParameters opts =
+ EXIParameters.fromXmlElement(
+ XmlElement.fromDomElement(
+ XmlUtil.readXmlToElement(sourceXml)));
+
+
+ assertEquals(opts.getOptions().getAlignmentType(), exiOptions.getAlignmentType());
+ assertEquals(opts.getOptions().getPreserveComments(), exiOptions.getPreserveComments());
+ assertEquals(opts.getOptions().getPreserveLexicalValues(), exiOptions.getPreserveLexicalValues());
+ assertEquals(opts.getOptions().getPreserveNS(), exiOptions.getPreserveNS());
+ assertEquals(opts.getOptions().getPreserveDTD(), exiOptions.getPreserveDTD());
+ assertEquals(opts.getOptions().getPreserveNS(), exiOptions.getPreserveNS());
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.exi;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import org.custommonkey.xmlunit.Diff;
+import org.custommonkey.xmlunit.XMLUnit;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.openexi.proc.common.AlignmentType;
+import org.openexi.proc.common.EXIOptions;
+
+@RunWith(Parameterized.class)
+public class NetconfStartExiMessageTest {
+
+ @Parameterized.Parameters
+ public static Iterable<Object[]> data() throws Exception {
+ final String noChangeXml = "<rpc xmlns:ns0=\"urn:ietf:params:xml:ns:netconf:base:1.0\" ns0:message-id=\"id\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ "<start-exi xmlns=\"urn:ietf:params:xml:ns:netconf:exi:1.0\">\n" +
+ "<alignment>bit-packed</alignment>\n" +
+ "</start-exi>\n" +
+ "</rpc>";
+
+
+ final String fullOptionsXml = "<rpc xmlns:ns0=\"urn:ietf:params:xml:ns:netconf:base:1.0\" ns0:message-id=\"id\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ "<start-exi xmlns=\"urn:ietf:params:xml:ns:netconf:exi:1.0\">\n" +
+ "<alignment>byte-aligned</alignment>\n" +
+ "<fidelity>\n" +
+ "<comments/>\n" +
+ "<dtd/>\n" +
+ "<lexical-values/>\n" +
+ "<pis/>\n" +
+ "<prefixes/>\n" +
+ "</fidelity>\n" +
+ "</start-exi>\n" +
+ "</rpc>";
+
+ final EXIOptions fullOptions = new EXIOptions();
+ fullOptions.setAlignmentType(AlignmentType.byteAligned);
+ fullOptions.setPreserveLexicalValues(true);
+ fullOptions.setPreserveDTD(true);
+ fullOptions.setPreserveComments(true);
+ fullOptions.setPreserveNS(true);
+ fullOptions.setPreservePIs(true);
+
+ return Arrays.asList(new Object[][]{
+ {noChangeXml, new EXIOptions()},
+ {fullOptionsXml, fullOptions},
+ });
+ }
+
+ private final String controlXml;
+ private final EXIOptions exiOptions;
+
+ public NetconfStartExiMessageTest(final String controlXml, final EXIOptions exiOptions) {
+ this.controlXml = controlXml;
+ this.exiOptions = exiOptions;
+ }
+
+ @Test
+ public void testCreate() throws Exception {
+ final NetconfStartExiMessage startExiMessage = NetconfStartExiMessage.create(exiOptions, "id");
+
+ XMLUnit.setIgnoreWhitespace(true);
+ XMLUnit.setIgnoreAttributeOrder(true);
+ final Diff diff = XMLUnit.compareXML(XMLUnit.buildControlDocument(controlXml), startExiMessage.getDocument());
+ assertTrue(diff.toString(), diff.similar());
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import java.nio.channels.WritePendingException;
+import org.apache.sshd.ClientChannel;
+import org.apache.sshd.ClientSession;
+import org.apache.sshd.SshClient;
+import org.apache.sshd.client.channel.ChannelSubsystem;
+import org.apache.sshd.client.future.AuthFuture;
+import org.apache.sshd.client.future.ConnectFuture;
+import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.SshFuture;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoReadFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.Buffer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+public class AsyncSshHandlerTest {
+
+ @Mock
+ private SshClient sshClient;
+ @Mock
+ private AuthenticationHandler authHandler;
+ @Mock
+ private ChannelHandlerContext ctx;
+ @Mock
+ private Channel channel;
+ @Mock
+ private SocketAddress remoteAddress;
+ @Mock
+ private SocketAddress localAddress;
+
+ private AsyncSshHandler asyncSshHandler;
+
+ private SshFutureListener<ConnectFuture> sshConnectListener;
+ private SshFutureListener<AuthFuture> sshAuthListener;
+ private SshFutureListener<OpenFuture> sshChannelOpenListener;
+
+ private ChannelPromise promise;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ stubAuth();
+ stubSshClient();
+ stubChannel();
+ stubCtx();
+ stubRemoteAddress();
+
+ promise = getMockedPromise();
+
+ asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ sshConnectListener = null;
+ sshAuthListener = null;
+ sshChannelOpenListener = null;
+ promise = null;
+ asyncSshHandler.close(ctx, getMockedPromise());
+ }
+
+ private void stubAuth() throws IOException {
+ doReturn("usr").when(authHandler).getUsername();
+
+ final AuthFuture authFuture = mock(AuthFuture.class);
+ Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<AuthFuture> result) {
+ sshAuthListener = result;
+ }
+ });
+ doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
+ final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
+ return null;
+ }
+ }).when(future).addListener(any(SshFutureListener.class));
+
+ return listenerSettableFuture;
+ }
+
+ private void stubRemoteAddress() {
+ doReturn("remote").when(remoteAddress).toString();
+ }
+
+ private void stubCtx() {
+ doReturn(channel).when(ctx).channel();
+ doReturn(ctx).when(ctx).fireChannelActive();
+ doReturn(ctx).when(ctx).fireChannelInactive();
+ doReturn(ctx).when(ctx).fireChannelRead(anyObject());
+ doReturn(getMockedPromise()).when(ctx).newPromise();
+ }
+
+ private void stubChannel() {
+ doReturn("channel").when(channel).toString();
+ }
+
+ private void stubSshClient() {
+ doNothing().when(sshClient).start();
+ final ConnectFuture connectFuture = mock(ConnectFuture.class);
+ Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<ConnectFuture> result) {
+ sshConnectListener = result;
+ }
+ });
+ doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
+ }
+
+ @Test
+ public void testConnectSuccess() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
+
+ verify(promise).setSuccess();
+ verifyNoMoreInteractions(promise);
+ verify(ctx).fireChannelActive();
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ verify(ctx).fireChannelRead(any(ByteBuf.class));
+ }
+
+ @Test
+ public void testReadClosed() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoReadFuture mockedReadFuture = asyncOut.read(null);
+
+ Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<IoReadFuture> result) {
+ doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
+ doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
+ doReturn(true).when(asyncOut).isClosing();
+ doReturn(true).when(asyncOut).isClosed();
+ result.operationComplete(mockedReadFuture);
+ }
+ });
+
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ verify(ctx).fireChannelInactive();
+ }
+
+ @Test
+ public void testReadFail() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoReadFuture mockedReadFuture = asyncOut.read(null);
+
+ Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<IoReadFuture> result) {
+ doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
+ doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
+ result.operationComplete(mockedReadFuture);
+ }
+ });
+
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ verify(ctx).fireChannelInactive();
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ final ChannelPromise writePromise = getMockedPromise();
+ asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
+
+ verify(writePromise).setSuccess();
+ }
+
+ @Test
+ public void testWriteClosed() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+
+ final IoWriteFuture ioWriteFuture = asyncIn.write(null);
+
+ Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
+ doReturn(false).when(ioWriteFuture).isWritten();
+ doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
+ doReturn(true).when(asyncIn).isClosing();
+ doReturn(true).when(asyncIn).isClosed();
+ result.operationComplete(ioWriteFuture);
+ }
+ });
+
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ final ChannelPromise writePromise = getMockedPromise();
+ asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
+
+ verify(writePromise).setFailure(any(Throwable.class));
+ }
+
+ @Test
+ public void testWritePendingOne() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final IoWriteFuture ioWriteFuture = asyncIn.write(null);
+
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ final ChannelPromise firstWritePromise = getMockedPromise();
+
+ // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
+ final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
+ asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
+ final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
+ // intercept second listener, this is the listener for pending write for the pending write to know when pending state ended
+ final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
+
+ final ChannelPromise secondWritePromise = getMockedPromise();
+ // now make write throw pending exception
+ doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
+ asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
+
+ doReturn(ioWriteFuture).when(asyncIn).write(any(Buffer.class));
+
+ verifyZeroInteractions(firstWritePromise, secondWritePromise);
+
+ // make first write stop pending
+ firstWriteListener.operationComplete(ioWriteFuture);
+ // intercept third listener, this is regular listener for second write to determine success or failure
+ final ListenableFuture<SshFutureListener<IoWriteFuture>> afterPendingListener = stubAddListener(ioWriteFuture);
+
+ // notify listener for second write that pending has ended
+ pendingListener.get().operationComplete(ioWriteFuture);
+ // Notify third listener (regular listener for second write) that second write succeeded
+ afterPendingListener.get().operationComplete(ioWriteFuture);
+
+ // verify both write promises successful
+ verify(firstWritePromise).setSuccess();
+ verify(secondWritePromise).setSuccess();
+ }
+
+ @Test
+ public void testWritePendingMax() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final IoWriteFuture ioWriteFuture = asyncIn.write(null);
+
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ final ChannelPromise firstWritePromise = getMockedPromise();
+
+ // intercept listener for first write, so we can invoke successful write later thus simulate pending of the first write
+ final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture = stubAddListener(ioWriteFuture);
+ asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
+
+ final ChannelPromise secondWritePromise = getMockedPromise();
+ // now make write throw pending exception
+ doThrow(org.apache.sshd.common.io.WritePendingException.class).when(asyncIn).write(any(Buffer.class));
+ for (int i = 0; i < 1000; i++) {
+ asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
+ }
+
+ verify(ctx).fireChannelInactive();
+ }
+
+ @Test
+ public void testDisconnect() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+ sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
+
+ final ChannelPromise disconnectPromise = getMockedPromise();
+ asyncSshHandler.disconnect(ctx, disconnectPromise);
+
+ verify(sshSession).close(anyBoolean());
+ verify(disconnectPromise).setSuccess();
+ verify(ctx).fireChannelInactive();
+ }
+
+ private OpenFuture getSuccessOpenFuture() {
+ final OpenFuture failedOpenFuture = mock(OpenFuture.class);
+ doReturn(true).when(failedOpenFuture).isOpened();
+ return failedOpenFuture;
+ }
+
+ private AuthFuture getSuccessAuthFuture() {
+ final AuthFuture authFuture = mock(AuthFuture.class);
+ doReturn(true).when(authFuture).isSuccess();
+ return authFuture;
+ }
+
+ private ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
+ final ConnectFuture connectFuture = mock(ConnectFuture.class);
+ doReturn(true).when(connectFuture).isConnected();
+
+ doReturn(sshSession).when(connectFuture).getSession();
+ return connectFuture;
+ }
+
+ private ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
+ final ClientSession sshSession = mock(ClientSession.class);
+
+ doReturn("sshSession").when(sshSession).toString();
+ doReturn("serverVersion").when(sshSession).getServerVersion();
+ doReturn(false).when(sshSession).isClosed();
+ doReturn(false).when(sshSession).isClosing();
+ final CloseFuture closeFuture = mock(CloseFuture.class);
+ Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<CloseFuture> result) {
+ doReturn(true).when(closeFuture).isClosed();
+ result.operationComplete(closeFuture);
+ }
+ });
+ doReturn(closeFuture).when(sshSession).close(false);
+
+ doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
+
+ return sshSession;
+ }
+
+ private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut, final IoOutputStream asyncIn) throws IOException {
+ final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
+ doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
+ final OpenFuture openFuture = mock(OpenFuture.class);
+
+ Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<OpenFuture> result) {
+ sshChannelOpenListener = result;
+ }
+ });
+
+ doReturn(asyncOut).when(subsystemChannel).getAsyncOut();
+
+ doReturn(openFuture).when(subsystemChannel).open();
+ doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
+ return subsystemChannel;
+ }
+
+ private IoOutputStream getMockedIoOutputStream() {
+ final IoOutputStream mock = mock(IoOutputStream.class);
+ final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
+ doReturn(ioWriteFuture).when(ioWriteFuture).addListener(Matchers.<SshFutureListener<IoWriteFuture>>any());
+ doReturn(true).when(ioWriteFuture).isWritten();
+
+ Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
+ result.operationComplete(ioWriteFuture);
+ }
+ });
+
+ doReturn(ioWriteFuture).when(mock).write(any(Buffer.class));
+ doReturn(false).when(mock).isClosed();
+ doReturn(false).when(mock).isClosing();
+ return mock;
+ }
+
+ private IoInputStream getMockedIoInputStream() {
+ final IoInputStream mock = mock(IoInputStream.class);
+ final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
+ doReturn(null).when(ioReadFuture).getException();
+ doReturn(ioReadFuture).when(ioReadFuture).removeListener(Matchers.<SshFutureListener<IoReadFuture>>any());
+ doReturn(5).when(ioReadFuture).getRead();
+ doReturn(new Buffer(new byte[]{0, 1, 2, 3, 4})).when(ioReadFuture).getBuffer();
+ doReturn(ioReadFuture).when(ioReadFuture).addListener(Matchers.<SshFutureListener<IoReadFuture>>any());
+
+ // Always success for read
+ Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
+ @Override
+ public void onSuccess(final SshFutureListener<IoReadFuture> result) {
+ result.operationComplete(ioReadFuture);
+ }
+ });
+
+ doReturn(ioReadFuture).when(mock).read(any(Buffer.class));
+ doReturn(false).when(mock).isClosed();
+ doReturn(false).when(mock).isClosing();
+ return mock;
+ }
+
+ @Test
+ public void testConnectFailOpenChannel() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final IoInputStream asyncOut = getMockedIoInputStream();
+ final IoOutputStream asyncIn = getMockedIoOutputStream();
+ final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+ final ClientSession sshSession = getMockedSshSession(subsystemChannel);
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+
+ sshAuthListener.operationComplete(getSuccessAuthFuture());
+
+ verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
+
+ try {
+ sshChannelOpenListener.operationComplete(getFailedOpenFuture());
+ fail("Exception expected");
+ } catch (final Exception e) {
+ verify(promise).setFailure(any(Throwable.class));
+ verifyNoMoreInteractions(promise);
+ // TODO should ctx.channelInactive be called if we throw exception ?
+ }
+ }
+
+ @Test
+ public void testConnectFailAuth() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final ClientSession sshSession = mock(ClientSession.class);
+ doReturn(true).when(sshSession).isClosed();
+ final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
+
+ sshConnectListener.operationComplete(connectFuture);
+
+ final AuthFuture authFuture = getFailedAuthFuture();
+
+ try {
+ sshAuthListener.operationComplete(authFuture);
+ fail("Exception expected");
+ } catch (final Exception e) {
+ verify(promise).setFailure(any(Throwable.class));
+ verifyNoMoreInteractions(promise);
+ // TODO should ctx.channelInactive be called ?
+ }
+ }
+
+ private AuthFuture getFailedAuthFuture() {
+ final AuthFuture authFuture = mock(AuthFuture.class);
+ doReturn(false).when(authFuture).isSuccess();
+ doReturn(new IllegalStateException()).when(authFuture).getException();
+ return authFuture;
+ }
+
+ private OpenFuture getFailedOpenFuture() {
+ final OpenFuture authFuture = mock(OpenFuture.class);
+ doReturn(false).when(authFuture).isOpened();
+ doReturn(new IllegalStateException()).when(authFuture).getException();
+ return authFuture;
+ }
+
+ @Test
+ public void testConnectFail() throws Exception {
+ asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
+
+ final ConnectFuture connectFuture = getFailedConnectFuture();
+ try {
+ sshConnectListener.operationComplete(connectFuture);
+ fail("Exception expected");
+ } catch (final Exception e) {
+ verify(promise).setFailure(any(Throwable.class));
+ verifyNoMoreInteractions(promise);
+ // TODO should ctx.channelInactive be called ?
+ }
+ }
+
+ private ConnectFuture getFailedConnectFuture() {
+ final ConnectFuture connectFuture = mock(ConnectFuture.class);
+ doReturn(false).when(connectFuture).isConnected();
+ doReturn(new IllegalStateException()).when(connectFuture).getException();
+ return connectFuture;
+ }
+
+ private ChannelPromise getMockedPromise() {
+ final ChannelPromise promise = mock(ChannelPromise.class);
+ doReturn(promise).when(promise).setSuccess();
+ doReturn(promise).when(promise).setFailure(any(Throwable.class));
+ return promise;
+ }
+
+ private static abstract class SuccessFutureListener<T extends SshFuture<T>> implements FutureCallback<SshFutureListener<T>> {
+
+ @Override
+ public abstract void onSuccess(final SshFutureListener<T> result);
+
+ @Override
+ public void onFailure(final Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+}