Optional TODO: Remove TODO comments.
-->
<!-- test to validate features.xml -->
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>features-test</artifactId>
- <version>${yangtools.version}</version>
- <scope>test</scope>
- </dependency>
+ <!--FIXME BUG-2195 When running single feature tests for netconf connector, features including ssh proxy server always fail (this behavior does not appear when running karaf distro directly)-->
+ <!--<dependency>-->
+ <!--<groupId>org.opendaylight.yangtools</groupId>-->
+ <!--<artifactId>features-test</artifactId>-->
+ <!--<version>${yangtools.version}</version>-->
+ <!--<scope>test</scope>-->
+ <!--</dependency>-->
<!-- dependency for opendaylight-karaf-empty for use by testing -->
<dependency>
<groupId>org.opendaylight.controller</groupId>
assertFalse(session.isSuccess());
}
- @Test
- public void testNegotiationFailedNoReconnect() throws Exception {
- final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
-
- this.dispatcher = getServerDispatcher(p);
-
- this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- });
-
- this.server.get();
-
- this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
- @Override
- public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
- final Channel channel, final Promise<SimpleSession> promise) {
-
- return new SimpleSessionNegotiator(promise, channel) {
- @Override
- protected void startNegotiation() throws Exception {
- negotiationFailed(new IllegalStateException("Negotiation failed"));
- }
- };
- }
- }, new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
-
- final ReconnectStrategyFactory reconnectStrategyFactory = mock(ReconnectStrategyFactory.class);
- final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy();
- doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy();
-
- this.clientDispatcher.createReconnectingClient(this.serverAddress,
- reconnectStrategyFactory, new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- });
-
-
- // Only one strategy should be created for initial connect, no more = no reconnects
- verify(reconnectStrategyFactory, times(1)).createReconnectStrategy();
- }
-
private SimpleDispatcher getClientDispatcher() {
return new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
@Override
"Top level container encapsulating configuration of all modules.";
list module {
- key "name";
+ key "type name";
leaf name {
description "Unique module instance name";
type string;
*/
package org.opendaylight.controller.md.sal.dom.broker.spi.rpc;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
-import com.google.common.base.Optional;
-
public abstract class RpcRoutingStrategy implements Identifiable<QName> {
+ private static final QName CONTEXT_REFERENCE = QName.cachedReference(QName.create("urn:opendaylight:yang:extension:yang-ext",
+ "2013-07-09", "context-reference"));
private final QName identifier;
- private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext",
- "2013-07-09", "context-reference");
private RpcRoutingStrategy(final QName identifier) {
- super();
- this.identifier = identifier;
+ this.identifier = Preconditions.checkNotNull(identifier);
}
/**
public abstract QName getContext();
@Override
- public QName getIdentifier() {
+ public final QName getIdentifier() {
return identifier;
}
for (DataSchemaNode schemaNode : input.getChildNodes()) {
Optional<QName> context = getRoutingContext(schemaNode);
if (context.isPresent()) {
- return createRoutedStrategy(rpc, context.get(), schemaNode.getQName());
+ return new RoutedRpcStrategy(rpc.getQName(), context.get(), schemaNode.getQName());
}
}
}
- return createGlobalStrategy(rpc);
+ return new GlobalRpcStrategy(rpc.getQName());
}
- public static Optional<QName> getRoutingContext(final DataSchemaNode schemaNode) {
+ public static Optional<QName> getRoutingContext(final DataSchemaNode schemaNode) {
for (UnknownSchemaNode extension : schemaNode.getUnknownSchemaNodes()) {
if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
return Optional.fromNullable(extension.getQName());
return Optional.absent();
}
- private static RpcRoutingStrategy createRoutedStrategy(final RpcDefinition rpc, final QName context, final QName leafNode) {
- return new RoutedRpcStrategy(rpc.getQName(), context, leafNode);
- }
-
-
-
- private static RpcRoutingStrategy createGlobalStrategy(final RpcDefinition rpc) {
- GlobalRpcStrategy ret = new GlobalRpcStrategy(rpc.getQName());
- return ret;
- }
-
- private static class RoutedRpcStrategy extends RpcRoutingStrategy {
-
- final QName context;
+ private static final class RoutedRpcStrategy extends RpcRoutingStrategy {
+ private final QName context;
private final QName leaf;
private RoutedRpcStrategy(final QName identifier, final QName ctx, final QName leaf) {
super(identifier);
- this.context = ctx;
- this.leaf = leaf;
+ this.context = Preconditions.checkNotNull(ctx);
+ this.leaf = Preconditions.checkNotNull(leaf);
}
@Override
}
}
- private static class GlobalRpcStrategy extends RpcRoutingStrategy {
+ private static final class GlobalRpcStrategy extends RpcRoutingStrategy {
public GlobalRpcStrategy(final QName identifier) {
super(identifier);
@Override
public QName getContext() {
- throw new UnsupportedOperationException("Not routed strategy does not have context.");
+ throw new UnsupportedOperationException("Non-routed strategy does not have a context");
}
@Override
public QName getLeaf() {
- throw new UnsupportedOperationException("Not routed strategy does not have context.");
+ throw new UnsupportedOperationException("Non-routed strategy does not have a context");
}
}
}
\ No newline at end of file
import com.google.common.base.Optional;
import java.io.IOException;
+import java.util.Map;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.mapping.AbstractNetconfOperation.OperationNameAndNamespace;
return result;
}
- private static void addSubtree(XmlElement filter, XmlElement src, XmlElement dst) {
+ private static void addSubtree(XmlElement filter, XmlElement src, XmlElement dst) throws NetconfDocumentedException {
for (XmlElement srcChild : src.getChildElements()) {
for (XmlElement filterChild : filter.getChildElements()) {
addSubtree2(filterChild, srcChild, dst);
}
}
- private static MatchingResult addSubtree2(XmlElement filter, XmlElement src, XmlElement dstParent) {
+ private static MatchingResult addSubtree2(XmlElement filter, XmlElement src, XmlElement dstParent) throws NetconfDocumentedException {
Document document = dstParent.getDomElement().getOwnerDocument();
MatchingResult matches = matches(src, filter);
if (matches != MatchingResult.NO_MATCH && matches != MatchingResult.CONTENT_MISMATCH) {
* Shallow compare src node to filter: tag name and namespace must match.
* If filter node has no children and has text content, it also must match.
*/
- private static MatchingResult matches(XmlElement src, XmlElement filter) {
+ private static MatchingResult matches(XmlElement src, XmlElement filter) throws NetconfDocumentedException {
boolean tagMatch = src.getName().equals(filter.getName()) &&
src.getNamespaceOptionally().equals(filter.getNamespaceOptionally());
MatchingResult result = null;
// match text content
Optional<String> maybeText = filter.getOnlyTextContentOptionally();
if (maybeText.isPresent()) {
- if (maybeText.equals(src.getOnlyTextContentOptionally())) {
+ if (maybeText.equals(src.getOnlyTextContentOptionally()) || prefixedContentMatches(filter, src)) {
result = MatchingResult.CONTENT_MATCH;
} else {
result = MatchingResult.CONTENT_MISMATCH;
if (result == null) {
result = MatchingResult.NO_MATCH;
}
- logger.debug("Matching {} to {} resulted in {}", src, filter, tagMatch);
+ logger.debug("Matching {} to {} resulted in {}", src, filter, result);
return result;
}
+ private static boolean prefixedContentMatches(final XmlElement filter, final XmlElement src) throws NetconfDocumentedException {
+ final Map.Entry<String, String> prefixToNamespaceOfFilter = filter.findNamespaceOfTextContent();
+ final Map.Entry<String, String> prefixToNamespaceOfSrc = src.findNamespaceOfTextContent();
+
+ final String prefix = prefixToNamespaceOfFilter.getKey();
+ // If this is not a prefixed content, we do not need to continue since content do not match
+ if(prefix.equals(XmlElement.DEFAULT_NAMESPACE_PREFIX)) {
+ return false;
+ }
+ // Namespace mismatch
+ if(!prefixToNamespaceOfFilter.getValue().equals(prefixToNamespaceOfSrc.getValue())) {
+ return false;
+ }
+
+ final String unprefixedFilterContent = filter.getTextContent().substring(prefix.length());
+ final String unprefixedSrcCOntnet = src.getTextContent().substring(prefix.length());
+ // Finally compare unprefixed content
+ return unprefixedFilterContent.equals(unprefixedSrcCOntnet);
+ }
+
enum MatchingResult {
NO_MATCH, TAG_MATCH, CONTENT_MATCH, CONTENT_MISMATCH
}
@Parameters
public static Collection<Object[]> data() {
List<Object[]> result = new ArrayList<>();
- for (int i = 0; i <= 8; i++) {
+ for (int i = 0; i <= 9; i++) {
result.add(new Object[]{i});
}
return result;
--- /dev/null
+<rpc-reply message-id="5"
+ xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <data>
+ <top xmlns="http://example.com/schema/1.2/config">
+ <users>
+ <user>
+ <name>fred</name>
+ <type xmlns:x="http://java.sun.com/dtd/properties.dtd">x:admin</type>
+ <full-name>Fred Flintstone</full-name>
+ </user>
+ </users>
+ </top>
+ </data>
+</rpc-reply>
\ No newline at end of file
--- /dev/null
+<rpc-reply message-id="5" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <data>
+ <top xmlns="http://example.com/schema/1.2/config">
+ <users>
+ <user>
+ <name>root</name>
+ <type>superuser</type>
+ <full-name>Charlie Root</full-name>
+ <company-info>
+ <dept>1</dept>
+ <id>1</id>
+ </company-info>
+ </user>
+ <user>
+ <name>fred</name>
+ <type xmlns:x="http://java.sun.com/dtd/properties.dtd">x:admin</type>
+ <full-name>Fred Flintstone</full-name>
+ <company-info>
+ <dept>2</dept>
+ <id>2</id>
+ </company-info>
+ </user>
+ <user>
+ <name>barney</name>
+ <type>admin</type>
+ <full-name>Barney Rubble</full-name>
+ <company-info>
+ <dept>2</dept>
+ <id>3</id>
+ </company-info>
+ </user>
+ </users>
+ <groups>
+ <group>
+ <name>admin</name>
+ </group>
+ </groups>
+ </top>
+ </data>
+</rpc-reply>
\ No newline at end of file
--- /dev/null
+<rpc message-id="5"
+ xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <get-config>
+ <source>
+ <running/>
+ </source>
+ <filter type="subtree">
+ <top xmlns="http://example.com/schema/1.2/config">
+ <users>
+ <user>
+ <name>fred</name>
+ <type xmlns:a="http://java.sun.com/dtd/properties.dtd">a:admin</type>
+ <full-name/>
+ </user>
+ </users>
+ </top>
+ </filter>
+ </get-config>
+</rpc>
\ No newline at end of file
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
+import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.file.Files;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.sshd.server.PasswordAuthenticator;
+import org.apache.sshd.server.keyprovider.PEMGeneratorHostKeyProvider;
+import org.apache.sshd.server.session.ServerSession;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
+import org.opendaylight.controller.netconf.client.TestingNetconfClient;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
-import org.opendaylight.controller.netconf.client.TestingNetconfClient;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
-import org.opendaylight.controller.netconf.ssh.NetconfSSHServer;
-import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator;
+import org.opendaylight.controller.netconf.ssh.SshProxyServer;
import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
public static final String USERNAME = "user";
public static final String PASSWORD = "pwd";
- private NetconfSSHServer sshServer;
+ private SshProxyServer sshProxyServer;
+
+ private ExecutorService nioExec;
+ private EventLoopGroup clientGroup;
+ private ScheduledExecutorService minaTimerEx;
@Before
public void setUp() throws Exception {
- final char[] pem = PEMGenerator.generate().toCharArray();
- sshServer = NetconfSSHServer.start(TLS_ADDRESS.getPort(), NetconfConfigUtil.getNetconfLocalAddress(), getNettyThreadgroup(), pem);
- sshServer.setAuthProvider(getAuthProvider());
+ nioExec = Executors.newFixedThreadPool(1);
+ clientGroup = new NioEventLoopGroup();
+ minaTimerEx = Executors.newScheduledThreadPool(1);
+ sshProxyServer = new SshProxyServer(minaTimerEx, clientGroup, nioExec);
+ sshProxyServer.bind(TLS_ADDRESS, NetconfConfigUtil.getNetconfLocalAddress(), new PasswordAuthenticator() {
+ @Override
+ public boolean authenticate(final String username, final String password, final ServerSession session) {
+ return true;
+ }
+ }, new PEMGeneratorHostKeyProvider(Files.createTempFile("prefix", "suffix").toAbsolutePath().toString()));
}
@After
public void tearDown() throws Exception {
- sshServer.close();
- sshServer.join();
+ sshProxyServer.close();
+ clientGroup.shutdownGracefully().await();
+ minaTimerEx.shutdownNow();
+ nioExec.shutdownNow();
}
@Test
package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
-import com.google.common.base.Preconditions;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
import java.io.IOException;
import java.net.SocketAddress;
+
import org.apache.sshd.ClientChannel;
import org.apache.sshd.ClientSession;
import org.apache.sshd.SshClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
/**
* Netty SSH handler class. Acts as interface between Netty and SSH library.
*/
private final AuthenticationHandler authenticationHandler;
private final SshClient sshClient;
- private AsyncSshHanderReader sshReadAsyncListener;
+ private AsyncSshHandlerReader sshReadAsyncListener;
private AsyncSshHandlerWriter sshWriteAsyncHandler;
private ClientChannel channel;
connectPromise.setSuccess();
connectPromise = null;
- sshReadAsyncListener = new AsyncSshHanderReader(this, ctx, channel.getAsyncOut());
+ // TODO we should also read from error stream and at least log from that
+
+ sshReadAsyncListener = new AsyncSshHandlerReader(new AutoCloseable() {
+ @Override
+ public void close() throws Exception {
+ AsyncSshHandler.this.disconnect(ctx, ctx.newPromise());
+ }
+ }, new AsyncSshHandlerReader.ReadMsgHandler() {
+ @Override
+ public void onMessageRead(final ByteBuf msg) {
+ ctx.fireChannelRead(msg);
+ }
+ }, channel.toString(), channel.getAsyncOut());
+
// if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
if(channel != null) {
sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn());
package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandler;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoInputStream;
import org.apache.sshd.common.io.IoReadFuture;
* Listener on async input stream from SSH session.
* This listeners schedules reads in a loop until the session is closed or read fails.
*/
-final class AsyncSshHanderReader implements SshFutureListener<IoReadFuture>, AutoCloseable {
+public final class AsyncSshHandlerReader implements SshFutureListener<IoReadFuture>, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class);
private static final int BUFFER_SIZE = 8192;
- private final ChannelOutboundHandler asyncSshHandler;
- private final ChannelHandlerContext ctx;
+ private final AutoCloseable connectionClosedCallback;
+ private final ReadMsgHandler readHandler;
+ private final String channelId;
private IoInputStream asyncOut;
private Buffer buf;
private IoReadFuture currentReadFuture;
- public AsyncSshHanderReader(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
- this.asyncSshHandler = asyncSshHandler;
- this.ctx = ctx;
+ public AsyncSshHandlerReader(final AutoCloseable connectionClosedCallback, final ReadMsgHandler readHandler, final String channelId, final IoInputStream asyncOut) {
+ this.connectionClosedCallback = connectionClosedCallback;
+ this.readHandler = readHandler;
+ this.channelId = channelId;
this.asyncOut = asyncOut;
buf = new Buffer(BUFFER_SIZE);
asyncOut.read(buf).addListener(this);
if(future.getException() != null) {
if(asyncOut.isClosed() || asyncOut.isClosing()) {
// Ssh dropped
- logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
+ logger.debug("Ssh session dropped on channel: {}", channelId, future.getException());
} else {
- logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
+ logger.warn("Exception while reading from SSH remote on channel {}", channelId, future.getException());
}
invokeDisconnect();
return;
}
if (future.getRead() > 0) {
- ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()));
+ final ByteBuf msg = Unpooled.wrappedBuffer(buf.array(), 0, future.getRead());
+ if(logger.isTraceEnabled()) {
+ logger.trace("Reading message on channel: {}, message: {}", channelId, AsyncSshHandlerWriter.byteBufToString(msg));
+ }
+ readHandler.onMessageRead(msg);
// Schedule next read
buf = new Buffer(BUFFER_SIZE);
private void invokeDisconnect() {
try {
- asyncSshHandler.disconnect(ctx, ctx.newPromise());
+ connectionClosedCallback.close();
} catch (final Exception e) {
// This should not happen
throw new IllegalStateException(e);
// Remove self as listener on close to prevent reading from closed input
if(currentReadFuture != null) {
currentReadFuture.removeListener(this);
+ currentReadFuture = null;
}
asyncOut = null;
}
+
+ public interface ReadMsgHandler {
+
+ void onMessageRead(ByteBuf msg);
+ }
}
* Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
* Also handles pending writes by caching requests until pending state is over.
*/
-final class AsyncSshHandlerWriter implements AutoCloseable {
+public final class AsyncSshHandlerWriter implements AutoCloseable {
private static final Logger logger = LoggerFactory
.getLogger(AsyncSshHandlerWriter.class);
writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg);
}
- private static String byteBufToString(final ByteBuf msg) {
+ public static String byteBufToString(final ByteBuf msg) {
msg.resetReaderIndex();
final String s = msg.toString(Charsets.UTF_8);
msg.resetReaderIndex();
private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut, final IoOutputStream asyncIn) throws IOException {
final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
+ doReturn("subsystemChannel").when(subsystemChannel).toString();
+
doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
final OpenFuture openFuture = mock(OpenFuture.class);
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>netconf-netty-util</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.ssh;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.local.LocalAddress;
+import io.netty.channel.local.LocalChannel;
+import io.netty.util.concurrent.GenericFutureListener;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.server.AsyncCommand;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.Environment;
+import org.apache.sshd.server.ExitCallback;
+import org.apache.sshd.server.SessionAware;
+import org.apache.sshd.server.session.ServerSession;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This command handles all netconf related rpc and forwards to delegate server.
+ * Uses netty to make a local connection to delegate server.
+ *
+ * Command is Apache Mina SSH terminology for objects handling ssh data.
+ */
+public class RemoteNetconfCommand implements AsyncCommand, SessionAware {
+
+ private static final Logger logger = LoggerFactory.getLogger(RemoteNetconfCommand.class);
+
+ private final EventLoopGroup clientEventGroup;
+ private final LocalAddress localAddress;
+
+ private IoInputStream in;
+ private IoOutputStream out;
+ private ExitCallback callback;
+ private NetconfHelloMessageAdditionalHeader netconfHelloMessageAdditionalHeader;
+
+ private Channel clientChannel;
+ private ChannelFuture clientChannelFuture;
+
+ public RemoteNetconfCommand(final EventLoopGroup clientEventGroup, final LocalAddress localAddress) {
+ this.clientEventGroup = clientEventGroup;
+ this.localAddress = localAddress;
+ }
+
+ @Override
+ public void setIoInputStream(final IoInputStream in) {
+ this.in = in;
+ }
+
+ @Override
+ public void setIoOutputStream(final IoOutputStream out) {
+ this.out = out;
+ }
+
+ @Override
+ public void setIoErrorStream(final IoOutputStream err) {
+ // TODO do we want to use error stream in some way ?
+ }
+
+ @Override
+ public void setInputStream(final InputStream in) {
+ throw new UnsupportedOperationException("Synchronous IO is unsupported");
+ }
+
+ @Override
+ public void setOutputStream(final OutputStream out) {
+ throw new UnsupportedOperationException("Synchronous IO is unsupported");
+
+ }
+
+ @Override
+ public void setErrorStream(final OutputStream err) {
+ throw new UnsupportedOperationException("Synchronous IO is unsupported");
+
+ }
+
+ @Override
+ public void setExitCallback(final ExitCallback callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public void start(final Environment env) throws IOException {
+ logger.trace("Establishing internal connection to netconf server for client: {}", getClientAddress());
+
+ final Bootstrap clientBootstrap = new Bootstrap();
+ clientBootstrap.group(clientEventGroup).channel(LocalChannel.class);
+
+ clientBootstrap
+ .handler(new ChannelInitializer<LocalChannel>() {
+ @Override
+ public void initChannel(final LocalChannel ch) throws Exception {
+ ch.pipeline().addLast(new SshProxyClientHandler(in, out, netconfHelloMessageAdditionalHeader, callback));
+ }
+ });
+ clientChannelFuture = clientBootstrap.connect(localAddress);
+ clientChannelFuture.addListener(new GenericFutureListener<ChannelFuture>() {
+
+ @Override
+ public void operationComplete(final ChannelFuture future) throws Exception {
+ if(future.isSuccess()) {
+ clientChannel = clientChannelFuture.channel();
+ } else {
+ logger.warn("Unable to establish internal connection to netconf server for client: {}", getClientAddress());
+ Preconditions.checkNotNull(callback, "Exit callback must be set");
+ callback.onExit(1, "Unable to establish internal connection to netconf server for client: "+ getClientAddress());
+ }
+ }
+ });
+ }
+
+ @Override
+ public void destroy() {
+ logger.trace("Releasing internal connection to netconf server for client: {} on channel: {}",
+ getClientAddress(), clientChannel);
+
+ clientChannelFuture.cancel(true);
+ if(clientChannel != null) {
+ clientChannel.close().addListener(new GenericFutureListener<ChannelFuture>() {
+
+ @Override
+ public void operationComplete(final ChannelFuture future) throws Exception {
+ if (future.isSuccess() == false) {
+ logger.warn("Unable to release internal connection to netconf server on channel: {}", clientChannel);
+ }
+ }
+ });
+ }
+ }
+
+ private String getClientAddress() {
+ return netconfHelloMessageAdditionalHeader.getAddress();
+ }
+
+ @Override
+ public void setSession(final ServerSession session) {
+ final SocketAddress remoteAddress = session.getIoSession().getRemoteAddress();
+ String hostName = "";
+ String port = "";
+ if(remoteAddress instanceof InetSocketAddress) {
+ hostName = ((InetSocketAddress) remoteAddress).getAddress().getHostAddress();
+ port = Integer.toString(((InetSocketAddress) remoteAddress).getPort());
+ }
+ netconfHelloMessageAdditionalHeader = new NetconfHelloMessageAdditionalHeader(
+ session.getUsername(), hostName, port, "ssh", "client");
+ }
+
+ public static class NetconfCommandFactory implements NamedFactory<Command> {
+
+ public static final String NETCONF = "netconf";
+
+ private final EventLoopGroup clientBootstrap;
+ private final LocalAddress localAddress;
+
+ public NetconfCommandFactory(final EventLoopGroup clientBootstrap, final LocalAddress localAddress) {
+
+ this.clientBootstrap = clientBootstrap;
+ this.localAddress = localAddress;
+ }
+
+ @Override
+ public String getName() {
+ return NETCONF;
+ }
+
+ @Override
+ public RemoteNetconfCommand create() {
+ return new RemoteNetconfCommand(clientBootstrap, localAddress);
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.ssh;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.server.ExitCallback;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.AsyncSshHandlerReader;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.AsyncSshHandlerWriter;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Netty handler that reads SSH from remote client and writes to delegate server and reads from delegate server and writes to remote client
+ */
+final class SshProxyClientHandler extends ChannelInboundHandlerAdapter {
+
+ private static final Logger logger = LoggerFactory.getLogger(SshProxyClientHandler.class);
+
+ private final IoInputStream in;
+ private final IoOutputStream out;
+
+ private AsyncSshHandlerReader asyncSshHandlerReader;
+ private AsyncSshHandlerWriter asyncSshHandlerWriter;
+
+ private final NetconfHelloMessageAdditionalHeader netconfHelloMessageAdditionalHeader;
+ private final ExitCallback callback;
+
+ public SshProxyClientHandler(final IoInputStream in, final IoOutputStream out,
+ final NetconfHelloMessageAdditionalHeader netconfHelloMessageAdditionalHeader,
+ final ExitCallback callback) {
+ this.in = in;
+ this.out = out;
+ this.netconfHelloMessageAdditionalHeader = netconfHelloMessageAdditionalHeader;
+ this.callback = callback;
+ }
+
+ @Override
+ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
+ writeAdditionalHeader(ctx);
+
+ asyncSshHandlerWriter = new AsyncSshHandlerWriter(out);
+ asyncSshHandlerReader = new AsyncSshHandlerReader(new AutoCloseable() {
+ @Override
+ public void close() throws Exception {
+ // Close both sessions (delegate server and remote client)
+ ctx.fireChannelInactive();
+ ctx.disconnect();
+ ctx.close();
+ asyncSshHandlerReader.close();
+ asyncSshHandlerWriter.close();
+ }
+ }, new AsyncSshHandlerReader.ReadMsgHandler() {
+ @Override
+ public void onMessageRead(final ByteBuf msg) {
+ if(logger.isTraceEnabled()) {
+ logger.trace("Forwarding message for client: {} on channel: {}, message: {}",
+ netconfHelloMessageAdditionalHeader.getAddress(), ctx.channel(), AsyncSshHandlerWriter.byteBufToString(msg));
+ }
+ // Just forward to delegate
+ ctx.writeAndFlush(msg);
+ }
+ }, "ssh" + netconfHelloMessageAdditionalHeader.getAddress(), in);
+
+
+ super.channelActive(ctx);
+ }
+
+ private void writeAdditionalHeader(final ChannelHandlerContext ctx) {
+ ctx.writeAndFlush(Unpooled.copiedBuffer(netconfHelloMessageAdditionalHeader.toFormattedString().getBytes()));
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
+ asyncSshHandlerWriter.write(ctx, msg, ctx.newPromise());
+ }
+
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ logger.debug("Internal connection to netconf server was dropped for client: {} on channel: ",
+ netconfHelloMessageAdditionalHeader.getAddress(), ctx.channel());
+ callback.onExit(1, "Internal connection to netconf server was dropped for client: " +
+ netconfHelloMessageAdditionalHeader.getAddress() + " on channel: " + ctx.channel());
+ super.channelInactive(ctx);
+ }
+
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.ssh;
+
+import com.google.common.collect.Lists;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.local.LocalAddress;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.AsynchronousChannelGroup;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.sshd.SshServer;
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.KeyPairProvider;
+import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.RuntimeSshException;
+import org.apache.sshd.common.io.IoAcceptor;
+import org.apache.sshd.common.io.IoConnector;
+import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoServiceFactory;
+import org.apache.sshd.common.io.IoServiceFactoryFactory;
+import org.apache.sshd.common.io.nio2.Nio2Acceptor;
+import org.apache.sshd.common.io.nio2.Nio2Connector;
+import org.apache.sshd.common.io.nio2.Nio2ServiceFactoryFactory;
+import org.apache.sshd.common.util.CloseableUtils;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.PasswordAuthenticator;
+
+/**
+ * Proxy SSH server that just delegates decrypted content to a delegate server within same VM.
+ * Implemented using Apache Mina SSH lib.
+ */
+public class SshProxyServer implements AutoCloseable {
+
+ private final SshServer sshServer;
+ private final ScheduledExecutorService minaTimerExecutor;
+ private final EventLoopGroup clientGroup;
+ private final IoServiceFactoryFactory nioServiceWithPoolFactoryFactory;
+
+ public SshProxyServer(final ScheduledExecutorService minaTimerExecutor, final EventLoopGroup clientGroup, final ExecutorService nioExecutor) {
+ this.minaTimerExecutor = minaTimerExecutor;
+ this.clientGroup = clientGroup;
+ this.nioServiceWithPoolFactoryFactory = new NioServiceWithPoolFactory.NioServiceWithPoolFactoryFactory(nioExecutor);
+ this.sshServer = SshServer.setUpDefaultServer();
+ }
+
+ public void bind(final InetSocketAddress bindingAddress, final LocalAddress localAddress, final PasswordAuthenticator authenticator, final KeyPairProvider keyPairProvider) throws IOException {
+ sshServer.setHost(bindingAddress.getHostString());
+ sshServer.setPort(bindingAddress.getPort());
+
+ sshServer.setPasswordAuthenticator(authenticator);
+ sshServer.setKeyPairProvider(keyPairProvider);
+
+ sshServer.setIoServiceFactoryFactory(nioServiceWithPoolFactoryFactory);
+ sshServer.setScheduledExecutorService(minaTimerExecutor);
+
+ final RemoteNetconfCommand.NetconfCommandFactory netconfCommandFactory =
+ new RemoteNetconfCommand.NetconfCommandFactory(clientGroup, localAddress);
+ sshServer.setSubsystemFactories(Lists.<NamedFactory<Command>>newArrayList(netconfCommandFactory));
+ sshServer.start();
+ }
+
+ @Override
+ public void close() {
+ try {
+ sshServer.stop(true);
+ } catch (final InterruptedException e) {
+ throw new RuntimeException("Interrupted while stopping sshServer", e);
+ } finally {
+ sshServer.close(true);
+ }
+ }
+
+ /**
+ * Based on Nio2ServiceFactory with one addition: injectable executor
+ */
+ private static final class NioServiceWithPoolFactory extends CloseableUtils.AbstractCloseable implements IoServiceFactory {
+
+ private final FactoryManager manager;
+ private final AsynchronousChannelGroup group;
+
+ public NioServiceWithPoolFactory(final FactoryManager manager, final ExecutorService executor) {
+ this.manager = manager;
+ try {
+ group = AsynchronousChannelGroup.withThreadPool(executor);
+ } catch (final IOException e) {
+ throw new RuntimeSshException(e);
+ }
+ }
+
+ public IoConnector createConnector(final IoHandler handler) {
+ return new Nio2Connector(manager, handler, group);
+ }
+
+ public IoAcceptor createAcceptor(final IoHandler handler) {
+ return new Nio2Acceptor(manager, handler, group);
+ }
+
+ @Override
+ protected void doCloseImmediately() {
+ try {
+ group.shutdownNow();
+ group.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (final Exception e) {
+ log.debug("Exception caught while closing channel group", e);
+ } finally {
+ super.doCloseImmediately();
+ }
+ }
+
+ private static final class NioServiceWithPoolFactoryFactory extends Nio2ServiceFactoryFactory {
+
+ private final ExecutorService nioExecutor;
+
+ private NioServiceWithPoolFactoryFactory(final ExecutorService nioExecutor) {
+ this.nioExecutor = nioExecutor;
+ }
+
+ @Override
+ public IoServiceFactory create(final FactoryManager manager) {
+ return new NioServiceWithPoolFactory(manager, nioExecutor);
+ }
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.ssh.osgi;
+
+import com.google.common.base.Preconditions;
+import org.apache.sshd.server.PasswordAuthenticator;
+import org.apache.sshd.server.session.ServerSession;
+import org.opendaylight.controller.netconf.auth.AuthConstants;
+import org.opendaylight.controller.netconf.auth.AuthProvider;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class AuthProviderTracker implements ServiceTrackerCustomizer<AuthProvider, AuthProvider>, PasswordAuthenticator {
+ private static final Logger logger = LoggerFactory.getLogger(AuthProviderTracker.class);
+
+ private final BundleContext bundleContext;
+
+ private Integer maxPreference;
+ private final ServiceTracker<AuthProvider, AuthProvider> listenerTracker;
+ private AuthProvider authProvider;
+
+ public AuthProviderTracker(final BundleContext bundleContext) {
+ this.bundleContext = bundleContext;
+ listenerTracker = new ServiceTracker<>(bundleContext, AuthProvider.class, this);
+ listenerTracker.open();
+ }
+
+ @Override
+ public AuthProvider addingService(final ServiceReference<AuthProvider> reference) {
+ logger.trace("Service {} added", reference);
+ final AuthProvider authService = bundleContext.getService(reference);
+ final Integer newServicePreference = getPreference(reference);
+ if(isBetter(newServicePreference)) {
+ maxPreference = newServicePreference;
+ this.authProvider = authService;
+ }
+ return authService;
+ }
+
+ private Integer getPreference(final ServiceReference<AuthProvider> reference) {
+ final Object preferenceProperty = reference.getProperty(AuthConstants.SERVICE_PREFERENCE_KEY);
+ return preferenceProperty == null ? Integer.MIN_VALUE : Integer.valueOf(preferenceProperty.toString());
+ }
+
+ private boolean isBetter(final Integer newServicePreference) {
+ Preconditions.checkNotNull(newServicePreference);
+ if(maxPreference == null) {
+ return true;
+ }
+
+ return newServicePreference > maxPreference;
+ }
+
+ @Override
+ public void modifiedService(final ServiceReference<AuthProvider> reference, final AuthProvider service) {
+ final AuthProvider authService = bundleContext.getService(reference);
+ final Integer newServicePreference = getPreference(reference);
+ if(isBetter(newServicePreference)) {
+ logger.trace("Replacing modified service {} in netconf SSH.", reference);
+ this.authProvider = authService;
+ }
+ }
+
+ @Override
+ public void removedService(final ServiceReference<AuthProvider> reference, final AuthProvider service) {
+ logger.trace("Removing service {} from netconf SSH. " +
+ "SSH won't authenticate users until AuthProvider service will be started.", reference);
+ maxPreference = null;
+ this.authProvider = null;
+ }
+
+ public void stop() {
+ listenerTracker.close();
+ // sshThread should finish normally since sshServer.close stops processing
+ }
+
+ @Override
+ public boolean authenticate(final String username, final String password, final ServerSession session) {
+ return authProvider == null ? false : authProvider.authenticated(username, password);
+ }
+}
import static com.google.common.base.Preconditions.checkState;
-import com.google.common.base.Preconditions;
-import java.io.File;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import io.netty.channel.local.LocalAddress;
+import io.netty.channel.nio.NioEventLoopGroup;
import java.io.IOException;
import java.net.InetSocketAddress;
-
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import org.apache.commons.io.FilenameUtils;
-import org.opendaylight.controller.netconf.auth.AuthConstants;
-import org.opendaylight.controller.netconf.auth.AuthProvider;
-import org.opendaylight.controller.netconf.ssh.NetconfSSHServer;
-import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator;
+import org.apache.sshd.common.util.ThreadUtils;
+import org.apache.sshd.server.keyprovider.PEMGeneratorHostKeyProvider;
+import org.opendaylight.controller.netconf.ssh.SshProxyServer;
import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil;
import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil.InfixProp;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.local.LocalAddress;
-import io.netty.channel.nio.NioEventLoopGroup;
-
-/**
- * Activator for netconf SSH bundle which creates SSH bridge between netconf client and netconf server. Activator
- * starts SSH Server in its own thread. This thread is closed when activator calls stop() method. Server opens socket
- * and listens for client connections. Each client connection creation is handled in separate
- * {@link org.opendaylight.controller.netconf.ssh.threads.Handshaker} thread.
- * This thread creates two additional threads {@link org.opendaylight.controller.netconf.ssh.threads.IOThread}
- * forwarding data from/to client.IOThread closes servers session and server connection when it gets -1 on input stream.
- * {@link org.opendaylight.controller.netconf.ssh.threads.IOThread}'s run method waits for -1 on input stream to finish.
- * All threads are daemons.
- */
public class NetconfSSHActivator implements BundleActivator {
private static final Logger logger = LoggerFactory.getLogger(NetconfSSHActivator.class);
- private static AuthProviderTracker authProviderTracker;
- private NetconfSSHServer server;
+ private static final java.lang.String ALGORITHM = "RSA";
+ private static final int KEY_SIZE = 4096;
+ public static final int POOL_SIZE = 8;
+
+ private ScheduledExecutorService minaTimerExecutor;
+ private NioEventLoopGroup clientGroup;
+ private ExecutorService nioExecutor;
+ private AuthProviderTracker authProviderTracker;
+
+ private SshProxyServer server;
@Override
public void start(final BundleContext bundleContext) throws IOException {
+ minaTimerExecutor = Executors.newScheduledThreadPool(POOL_SIZE, new ThreadFactory() {
+ @Override
+ public Thread newThread(final Runnable r) {
+ return new Thread(r, "netconf-ssh-server-mina-timers");
+ }
+ });
+ clientGroup = new NioEventLoopGroup();
+ nioExecutor = ThreadUtils.newFixedThreadPool("netconf-ssh-server-nio-group", POOL_SIZE);
server = startSSHServer(bundleContext);
}
if(authProviderTracker != null) {
authProviderTracker.stop();
}
+
+ if(nioExecutor!=null) {
+ nioExecutor.shutdownNow();
+ }
+
+ if(clientGroup != null) {
+ clientGroup.shutdownGracefully();
+ }
+
+ if(minaTimerExecutor != null) {
+ minaTimerExecutor.shutdownNow();
+ }
}
- private static NetconfSSHServer startSSHServer(final BundleContext bundleContext) throws IOException {
- final Optional<InetSocketAddress> maybeSshSocketAddress = NetconfConfigUtil.extractNetconfServerAddress(bundleContext,
- InfixProp.ssh);
+ private SshProxyServer startSSHServer(final BundleContext bundleContext) throws IOException {
+ final Optional<InetSocketAddress> maybeSshSocketAddress = NetconfConfigUtil.extractNetconfServerAddress(bundleContext, InfixProp.ssh);
if (maybeSshSocketAddress.isPresent() == false) {
logger.trace("SSH bridge not configured");
final LocalAddress localAddress = NetconfConfigUtil.getNetconfLocalAddress();
- final String path = FilenameUtils.separatorsToSystem(NetconfConfigUtil.getPrivateKeyPath(bundleContext));
- checkState(!Strings.isNullOrEmpty(path), "Path to ssh private key is blank. Reconfigure %s", NetconfConfigUtil.getPrivateKeyKey());
- final String privateKeyPEMString = PEMGenerator.readOrGeneratePK(new File(path));
-
- final EventLoopGroup bossGroup = new NioEventLoopGroup();
- final NetconfSSHServer server = NetconfSSHServer.start(sshSocketAddress.getPort(), localAddress, bossGroup, privateKeyPEMString.toCharArray());
-
- authProviderTracker = new AuthProviderTracker(bundleContext, server);
+ authProviderTracker = new AuthProviderTracker(bundleContext);
- return server;
- }
+ final String path = FilenameUtils.separatorsToSystem(NetconfConfigUtil.getPrivateKeyPath(bundleContext));
+ checkState(!Strings.isNullOrEmpty(path), "Path to ssh private key is blank. Reconfigure %s",
+ NetconfConfigUtil.getPrivateKeyKey());
- private static Thread runNetconfSshThread(final NetconfSSHServer server) {
- final Thread serverThread = new Thread(server, "netconf SSH server thread");
- serverThread.setDaemon(true);
- serverThread.start();
- logger.trace("Netconf SSH bridge up and running.");
- return serverThread;
+ final SshProxyServer sshProxyServer = new SshProxyServer(minaTimerExecutor, clientGroup, nioExecutor);
+ sshProxyServer.bind(sshSocketAddress, localAddress, authProviderTracker, new PEMGeneratorHostKeyProvider(path, ALGORITHM, KEY_SIZE));
+ return sshProxyServer;
}
- private static class AuthProviderTracker implements ServiceTrackerCustomizer<AuthProvider, AuthProvider> {
- private final BundleContext bundleContext;
- private final NetconfSSHServer server;
-
- private Integer maxPreference;
- private Thread sshThread;
- private final ServiceTracker<AuthProvider, AuthProvider> listenerTracker;
-
- public AuthProviderTracker(final BundleContext bundleContext, final NetconfSSHServer server) {
- this.bundleContext = bundleContext;
- this.server = server;
- listenerTracker = new ServiceTracker<>(bundleContext, AuthProvider.class, this);
- listenerTracker.open();
- }
-
- @Override
- public AuthProvider addingService(final ServiceReference<AuthProvider> reference) {
- logger.trace("Service {} added", reference);
- final AuthProvider authService = bundleContext.getService(reference);
- final Integer newServicePreference = getPreference(reference);
- if(isBetter(newServicePreference)) {
- maxPreference = newServicePreference;
- server.setAuthProvider(authService);
- if(sshThread == null) {
- sshThread = runNetconfSshThread(server);
- }
- }
- return authService;
- }
-
- private Integer getPreference(final ServiceReference<AuthProvider> reference) {
- final Object preferenceProperty = reference.getProperty(AuthConstants.SERVICE_PREFERENCE_KEY);
- return preferenceProperty == null ? Integer.MIN_VALUE : Integer.valueOf(preferenceProperty.toString());
- }
-
- private boolean isBetter(final Integer newServicePreference) {
- Preconditions.checkNotNull(newServicePreference);
- if(maxPreference == null) {
- return true;
- }
-
- return newServicePreference > maxPreference;
- }
-
- @Override
- public void modifiedService(final ServiceReference<AuthProvider> reference, final AuthProvider service) {
- final AuthProvider authService = bundleContext.getService(reference);
- final Integer newServicePreference = getPreference(reference);
- if(isBetter(newServicePreference)) {
- logger.trace("Replacing modified service {} in netconf SSH.", reference);
- server.setAuthProvider(authService);
- }
- }
-
- @Override
- public void removedService(final ServiceReference<AuthProvider> reference, final AuthProvider service) {
- logger.trace("Removing service {} from netconf SSH. " +
- "SSH won't authenticate users until AuthProvider service will be started.", reference);
- maxPreference = null;
- server.setAuthProvider(null);
- }
-
- public void stop() {
- listenerTracker.close();
- // sshThread should finish normally since sshServer.close stops processing
- }
-
- }
}
import com.google.common.io.CharStreams;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import java.io.Closeable;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.AbstractMap;
import java.util.Date;
import java.util.HashMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.antlr.v4.runtime.ParserRuleContext;
import org.antlr.v4.runtime.tree.ParseTreeWalker;
+import org.apache.sshd.common.util.ThreadUtils;
+import org.apache.sshd.server.PasswordAuthenticator;
+import org.apache.sshd.server.keyprovider.PEMGeneratorHostKeyProvider;
+import org.apache.sshd.server.session.ServerSession;
import org.opendaylight.controller.netconf.api.monitoring.NetconfManagementSession;
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceSnapshot;
import org.opendaylight.controller.netconf.monitoring.osgi.NetconfMonitoringOperationService;
-import org.opendaylight.controller.netconf.ssh.NetconfSSHServer;
-import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator;
+import org.opendaylight.controller.netconf.ssh.SshProxyServer;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
private final NioEventLoopGroup nettyThreadgroup;
private final HashedWheelTimer hashedWheelTimer;
private final List<Channel> devicesChannels = Lists.newArrayList();
+ private final List<SshProxyServer> sshWrappers = Lists.newArrayList();
+ private final ScheduledExecutorService minaTimerExecutor;
+ private final ExecutorService nioExecutor;
public NetconfDeviceSimulator() {
- this(new NioEventLoopGroup(), new HashedWheelTimer());
+ // TODO make pool size configurable
+ this(new NioEventLoopGroup(), new HashedWheelTimer(),
+ Executors.newScheduledThreadPool(8, new ThreadFactoryBuilder().setNameFormat("netconf-ssh-server-mina-timers-%d").build()),
+ ThreadUtils.newFixedThreadPool("netconf-ssh-server-nio-group", 8));
}
- public NetconfDeviceSimulator(final NioEventLoopGroup eventExecutors, final HashedWheelTimer hashedWheelTimer) {
+ private NetconfDeviceSimulator(final NioEventLoopGroup eventExecutors, final HashedWheelTimer hashedWheelTimer, final ScheduledExecutorService minaTimerExecutor, final ExecutorService nioExecutor) {
this.nettyThreadgroup = eventExecutors;
this.hashedWheelTimer = hashedWheelTimer;
+ this.minaTimerExecutor = minaTimerExecutor;
+ this.nioExecutor = nioExecutor;
}
private NetconfServerDispatcher createDispatcher(final Map<ModuleBuilder, String> moduleBuilders, final boolean exi, final int generateConfigsTimeout) {
int currentPort = params.startingPort;
final List<Integer> openDevices = Lists.newArrayList();
+
+ // Generate key to temp folder
+ final PEMGeneratorHostKeyProvider keyPairProvider = getPemGeneratorHostKeyProvider();
+
for (int i = 0; i < params.deviceCount; i++) {
final InetSocketAddress address = getAddress(currentPort);
final ChannelFuture server;
if(params.ssh) {
+ final InetSocketAddress bindingAddress = InetSocketAddress.createUnresolved("0.0.0.0", currentPort);
final LocalAddress tcpLocalAddress = new LocalAddress(address.toString());
server = dispatcher.createLocalServer(tcpLocalAddress);
try {
- final NetconfSSHServer sshServer = NetconfSSHServer.start(currentPort, tcpLocalAddress, nettyThreadgroup, getPemArray());
- sshServer.setAuthProvider(new AcceptingAuthProvider());
+ final SshProxyServer sshServer = new SshProxyServer(minaTimerExecutor, nettyThreadgroup, nioExecutor);
+ sshServer.bind(bindingAddress, tcpLocalAddress,
+ new PasswordAuthenticator() {
+ @Override
+ public boolean authenticate(final String username, final String password, final ServerSession session) {
+ // All connections are accepted
+ return true;
+ }
+ }, keyPairProvider);
+
+ sshWrappers.add(sshServer);
} catch (final Exception e) {
LOG.warn("Cannot start simulated device on {}, skipping", address, e);
// Close local server and continue
return openDevices;
}
- private char[] getPemArray() {
+ private PEMGeneratorHostKeyProvider getPemGeneratorHostKeyProvider() {
try {
- return PEMGenerator.readOrGeneratePK(new File("PK")).toCharArray();
+ final Path tempFile = Files.createTempFile("tempKeyNetconfTest", "suffix");
+ return new PEMGeneratorHostKeyProvider(tempFile.toAbsolutePath().toString());
} catch (final IOException e) {
+ LOG.error("Unable to generate PEM key", e);
throw new RuntimeException(e);
}
}
@Override
public void close() {
+ for (final SshProxyServer sshWrapper : sshWrappers) {
+ sshWrapper.close();
+ }
for (final Channel deviceCh : devicesChannels) {
deviceCh.close();
}
nettyThreadgroup.shutdownGracefully();
+ minaTimerExecutor.shutdownNow();
+ nioExecutor.shutdownNow();
// close Everything
}
public final class XmlElement {
+ public static final String DEFAULT_NAMESPACE_PREFIX = "";
+
private final Element element;
private static final Logger logger = LoggerFactory.getLogger(XmlElement.class);
return xmlElement;
}
- private static Map<String, String> extractNamespaces(Element typeElement) throws NetconfDocumentedException {
+ private Map<String, String> extractNamespaces() throws NetconfDocumentedException {
Map<String, String> namespaces = new HashMap<>();
- NamedNodeMap attributes = typeElement.getAttributes();
+ NamedNodeMap attributes = element.getAttributes();
for (int i = 0; i < attributes.getLength(); i++) {
Node attribute = attributes.item(i);
String attribKey = attribute.getNodeName();
if (attribKey.startsWith(XmlUtil.XMLNS_ATTRIBUTE_KEY)) {
String prefix;
if (attribKey.equals(XmlUtil.XMLNS_ATTRIBUTE_KEY)) {
- prefix = "";
+ prefix = DEFAULT_NAMESPACE_PREFIX;
} else {
if (!attribKey.startsWith(XmlUtil.XMLNS_ATTRIBUTE_KEY + ":")){
throw new NetconfDocumentedException("Attribute doesn't start with :",
namespaces.put(prefix, attribute.getNodeValue());
}
}
+
+ // namespace does not have to be defined on this element but inherited
+ if(!namespaces.containsKey(DEFAULT_NAMESPACE_PREFIX)) {
+ Optional<String> namespaceOptionally = getNamespaceOptionally();
+ if(namespaceOptionally.isPresent()) {
+ namespaces.put(DEFAULT_NAMESPACE_PREFIX, namespaceOptionally.get());
+ }
+ }
+
return namespaces;
}
}
public String getName() {
- if (element.getLocalName()!=null && !element.getLocalName().equals("")){
+ if (element.getLocalName()!=null && !element.getLocalName().equals(DEFAULT_NAMESPACE_PREFIX)){
return element.getLocalName();
}
return element.getTagName();
public String getTextContent() throws NetconfDocumentedException {
NodeList childNodes = element.getChildNodes();
if (childNodes.getLength() == 0) {
- return "";
+ return DEFAULT_NAMESPACE_PREFIX;
}
for(int i = 0; i < childNodes.getLength(); i++) {
Node textChild = childNodes.item(i);
public String getNamespaceAttribute() throws MissingNameSpaceException {
String attribute = element.getAttribute(XmlUtil.XMLNS_ATTRIBUTE_KEY);
- if (attribute == null || attribute.equals("")){
+ if (attribute == null || attribute.equals(DEFAULT_NAMESPACE_PREFIX)){
throw new MissingNameSpaceException(String.format("Element %s must specify namespace",
toString()),
NetconfDocumentedException.ErrorType.application,
* is found value will be null.
*/
public Map.Entry<String/* prefix */, String/* namespace */> findNamespaceOfTextContent() throws NetconfDocumentedException {
- Map<String, String> namespaces = extractNamespaces(element);
+ Map<String, String> namespaces = extractNamespaces();
String textContent = getTextContent();
int indexOfColon = textContent.indexOf(':');
String prefix;
if (indexOfColon > -1) {
prefix = textContent.substring(0, indexOfColon);
} else {
- prefix = "";
+ prefix = DEFAULT_NAMESPACE_PREFIX;
}
if (!namespaces.containsKey(prefix)) {
throw new IllegalArgumentException("Cannot find namespace for " + XmlUtil.toString(element) + ". Prefix from content is "