Optional TODO: Remove TODO comments.
-->
<!-- test to validate features.xml -->
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>features-test</artifactId>
- <version>${yangtools.version}</version>
- <scope>test</scope>
- </dependency>
+ <!--FIXME BUG-2195 When running single feature tests for netconf connector, features including ssh proxy server always fail (this behavior does not appear when running karaf distro directly)-->
+ <!--<dependency>-->
+ <!--<groupId>org.opendaylight.yangtools</groupId>-->
+ <!--<artifactId>features-test</artifactId>-->
+ <!--<version>${yangtools.version}</version>-->
+ <!--<scope>test</scope>-->
+ <!--</dependency>-->
<!-- dependency for opendaylight-karaf-empty for use by testing -->
<dependency>
<groupId>org.opendaylight.controller</groupId>
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
+import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.file.Files;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.sshd.server.PasswordAuthenticator;
+import org.apache.sshd.server.keyprovider.PEMGeneratorHostKeyProvider;
+import org.apache.sshd.server.session.ServerSession;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener;
+import org.opendaylight.controller.netconf.client.TestingNetconfClient;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
-import org.opendaylight.controller.netconf.client.TestingNetconfClient;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
-import org.opendaylight.controller.netconf.ssh.NetconfSSHServer;
-import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator;
+import org.opendaylight.controller.netconf.ssh.SshProxyServer;
import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
public static final String USERNAME = "user";
public static final String PASSWORD = "pwd";
- private NetconfSSHServer sshServer;
+ private SshProxyServer sshProxyServer;
+
+ private ExecutorService nioExec;
+ private EventLoopGroup clientGroup;
+ private ScheduledExecutorService minaTimerEx;
@Before
public void setUp() throws Exception {
- final char[] pem = PEMGenerator.generate().toCharArray();
- sshServer = NetconfSSHServer.start(TLS_ADDRESS.getPort(), NetconfConfigUtil.getNetconfLocalAddress(), getNettyThreadgroup(), pem);
- sshServer.setAuthProvider(getAuthProvider());
+ nioExec = Executors.newFixedThreadPool(1);
+ clientGroup = new NioEventLoopGroup();
+ minaTimerEx = Executors.newScheduledThreadPool(1);
+ sshProxyServer = new SshProxyServer(minaTimerEx, clientGroup, nioExec);
+ sshProxyServer.bind(TLS_ADDRESS, NetconfConfigUtil.getNetconfLocalAddress(), new PasswordAuthenticator() {
+ @Override
+ public boolean authenticate(final String username, final String password, final ServerSession session) {
+ return true;
+ }
+ }, new PEMGeneratorHostKeyProvider(Files.createTempFile("prefix", "suffix").toAbsolutePath().toString()));
}
@After
public void tearDown() throws Exception {
- sshServer.close();
- sshServer.join();
+ sshProxyServer.close();
+ clientGroup.shutdownGracefully().await();
+ minaTimerEx.shutdownNow();
+ nioExec.shutdownNow();
}
@Test
package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
-import com.google.common.base.Preconditions;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
import java.io.IOException;
import java.net.SocketAddress;
+
import org.apache.sshd.ClientChannel;
import org.apache.sshd.ClientSession;
import org.apache.sshd.SshClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
/**
* Netty SSH handler class. Acts as interface between Netty and SSH library.
*/
private final AuthenticationHandler authenticationHandler;
private final SshClient sshClient;
- private AsyncSshHanderReader sshReadAsyncListener;
+ private AsyncSshHandlerReader sshReadAsyncListener;
private AsyncSshHandlerWriter sshWriteAsyncHandler;
private ClientChannel channel;
connectPromise.setSuccess();
connectPromise = null;
- sshReadAsyncListener = new AsyncSshHanderReader(this, ctx, channel.getAsyncOut());
+ // TODO we should also read from error stream and at least log from that
+
+ sshReadAsyncListener = new AsyncSshHandlerReader(new AutoCloseable() {
+ @Override
+ public void close() throws Exception {
+ AsyncSshHandler.this.disconnect(ctx, ctx.newPromise());
+ }
+ }, new AsyncSshHandlerReader.ReadMsgHandler() {
+ @Override
+ public void onMessageRead(final ByteBuf msg) {
+ ctx.fireChannelRead(msg);
+ }
+ }, channel.toString(), channel.getAsyncOut());
+
// if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
if(channel != null) {
sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn());
package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandler;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoInputStream;
import org.apache.sshd.common.io.IoReadFuture;
* Listener on async input stream from SSH session.
* This listeners schedules reads in a loop until the session is closed or read fails.
*/
-final class AsyncSshHanderReader implements SshFutureListener<IoReadFuture>, AutoCloseable {
+public final class AsyncSshHandlerReader implements SshFutureListener<IoReadFuture>, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class);
private static final int BUFFER_SIZE = 8192;
- private final ChannelOutboundHandler asyncSshHandler;
- private final ChannelHandlerContext ctx;
+ private final AutoCloseable connectionClosedCallback;
+ private final ReadMsgHandler readHandler;
+ private final String channelId;
private IoInputStream asyncOut;
private Buffer buf;
private IoReadFuture currentReadFuture;
- public AsyncSshHanderReader(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
- this.asyncSshHandler = asyncSshHandler;
- this.ctx = ctx;
+ public AsyncSshHandlerReader(final AutoCloseable connectionClosedCallback, final ReadMsgHandler readHandler, final String channelId, final IoInputStream asyncOut) {
+ this.connectionClosedCallback = connectionClosedCallback;
+ this.readHandler = readHandler;
+ this.channelId = channelId;
this.asyncOut = asyncOut;
buf = new Buffer(BUFFER_SIZE);
asyncOut.read(buf).addListener(this);
if(future.getException() != null) {
if(asyncOut.isClosed() || asyncOut.isClosing()) {
// Ssh dropped
- logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
+ logger.debug("Ssh session dropped on channel: {}", channelId, future.getException());
} else {
- logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
+ logger.warn("Exception while reading from SSH remote on channel {}", channelId, future.getException());
}
invokeDisconnect();
return;
}
if (future.getRead() > 0) {
- ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()));
+ final ByteBuf msg = Unpooled.wrappedBuffer(buf.array(), 0, future.getRead());
+ if(logger.isTraceEnabled()) {
+ logger.trace("Reading message on channel: {}, message: {}", channelId, AsyncSshHandlerWriter.byteBufToString(msg));
+ }
+ readHandler.onMessageRead(msg);
// Schedule next read
buf = new Buffer(BUFFER_SIZE);
private void invokeDisconnect() {
try {
- asyncSshHandler.disconnect(ctx, ctx.newPromise());
+ connectionClosedCallback.close();
} catch (final Exception e) {
// This should not happen
throw new IllegalStateException(e);
// Remove self as listener on close to prevent reading from closed input
if(currentReadFuture != null) {
currentReadFuture.removeListener(this);
+ currentReadFuture = null;
}
asyncOut = null;
}
+
+ public interface ReadMsgHandler {
+
+ void onMessageRead(ByteBuf msg);
+ }
}
* Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
* Also handles pending writes by caching requests until pending state is over.
*/
-final class AsyncSshHandlerWriter implements AutoCloseable {
+public final class AsyncSshHandlerWriter implements AutoCloseable {
private static final Logger logger = LoggerFactory
.getLogger(AsyncSshHandlerWriter.class);
writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg);
}
- private static String byteBufToString(final ByteBuf msg) {
+ public static String byteBufToString(final ByteBuf msg) {
msg.resetReaderIndex();
final String s = msg.toString(Charsets.UTF_8);
msg.resetReaderIndex();
private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut, final IoOutputStream asyncIn) throws IOException {
final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
+ doReturn("subsystemChannel").when(subsystemChannel).toString();
+
doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
final OpenFuture openFuture = mock(OpenFuture.class);
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>netconf-netty-util</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.ssh;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.local.LocalAddress;
+import io.netty.channel.local.LocalChannel;
+import io.netty.util.concurrent.GenericFutureListener;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.server.AsyncCommand;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.Environment;
+import org.apache.sshd.server.ExitCallback;
+import org.apache.sshd.server.SessionAware;
+import org.apache.sshd.server.session.ServerSession;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This command handles all netconf related rpc and forwards to delegate server.
+ * Uses netty to make a local connection to delegate server.
+ *
+ * Command is Apache Mina SSH terminology for objects handling ssh data.
+ */
+public class RemoteNetconfCommand implements AsyncCommand, SessionAware {
+
+ private static final Logger logger = LoggerFactory.getLogger(RemoteNetconfCommand.class);
+
+ private final EventLoopGroup clientEventGroup;
+ private final LocalAddress localAddress;
+
+ private IoInputStream in;
+ private IoOutputStream out;
+ private ExitCallback callback;
+ private NetconfHelloMessageAdditionalHeader netconfHelloMessageAdditionalHeader;
+
+ private Channel clientChannel;
+ private ChannelFuture clientChannelFuture;
+
+ public RemoteNetconfCommand(final EventLoopGroup clientEventGroup, final LocalAddress localAddress) {
+ this.clientEventGroup = clientEventGroup;
+ this.localAddress = localAddress;
+ }
+
+ @Override
+ public void setIoInputStream(final IoInputStream in) {
+ this.in = in;
+ }
+
+ @Override
+ public void setIoOutputStream(final IoOutputStream out) {
+ this.out = out;
+ }
+
+ @Override
+ public void setIoErrorStream(final IoOutputStream err) {
+ // TODO do we want to use error stream in some way ?
+ }
+
+ @Override
+ public void setInputStream(final InputStream in) {
+ throw new UnsupportedOperationException("Synchronous IO is unsupported");
+ }
+
+ @Override
+ public void setOutputStream(final OutputStream out) {
+ throw new UnsupportedOperationException("Synchronous IO is unsupported");
+
+ }
+
+ @Override
+ public void setErrorStream(final OutputStream err) {
+ throw new UnsupportedOperationException("Synchronous IO is unsupported");
+
+ }
+
+ @Override
+ public void setExitCallback(final ExitCallback callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public void start(final Environment env) throws IOException {
+ logger.trace("Establishing internal connection to netconf server for client: {}", getClientAddress());
+
+ final Bootstrap clientBootstrap = new Bootstrap();
+ clientBootstrap.group(clientEventGroup).channel(LocalChannel.class);
+
+ clientBootstrap
+ .handler(new ChannelInitializer<LocalChannel>() {
+ @Override
+ public void initChannel(final LocalChannel ch) throws Exception {
+ ch.pipeline().addLast(new SshProxyClientHandler(in, out, netconfHelloMessageAdditionalHeader, callback));
+ }
+ });
+ clientChannelFuture = clientBootstrap.connect(localAddress);
+ clientChannelFuture.addListener(new GenericFutureListener<ChannelFuture>() {
+
+ @Override
+ public void operationComplete(final ChannelFuture future) throws Exception {
+ if(future.isSuccess()) {
+ clientChannel = clientChannelFuture.channel();
+ } else {
+ logger.warn("Unable to establish internal connection to netconf server for client: {}", getClientAddress());
+ Preconditions.checkNotNull(callback, "Exit callback must be set");
+ callback.onExit(1, "Unable to establish internal connection to netconf server for client: "+ getClientAddress());
+ }
+ }
+ });
+ }
+
+ @Override
+ public void destroy() {
+ logger.trace("Releasing internal connection to netconf server for client: {} on channel: {}",
+ getClientAddress(), clientChannel);
+
+ clientChannelFuture.cancel(true);
+ if(clientChannel != null) {
+ clientChannel.close().addListener(new GenericFutureListener<ChannelFuture>() {
+
+ @Override
+ public void operationComplete(final ChannelFuture future) throws Exception {
+ if (future.isSuccess() == false) {
+ logger.warn("Unable to release internal connection to netconf server on channel: {}", clientChannel);
+ }
+ }
+ });
+ }
+ }
+
+ private String getClientAddress() {
+ return netconfHelloMessageAdditionalHeader.getAddress();
+ }
+
+ @Override
+ public void setSession(final ServerSession session) {
+ final SocketAddress remoteAddress = session.getIoSession().getRemoteAddress();
+ String hostName = "";
+ String port = "";
+ if(remoteAddress instanceof InetSocketAddress) {
+ hostName = ((InetSocketAddress) remoteAddress).getAddress().getHostAddress();
+ port = Integer.toString(((InetSocketAddress) remoteAddress).getPort());
+ }
+ netconfHelloMessageAdditionalHeader = new NetconfHelloMessageAdditionalHeader(
+ session.getUsername(), hostName, port, "ssh", "client");
+ }
+
+ public static class NetconfCommandFactory implements NamedFactory<Command> {
+
+ public static final String NETCONF = "netconf";
+
+ private final EventLoopGroup clientBootstrap;
+ private final LocalAddress localAddress;
+
+ public NetconfCommandFactory(final EventLoopGroup clientBootstrap, final LocalAddress localAddress) {
+
+ this.clientBootstrap = clientBootstrap;
+ this.localAddress = localAddress;
+ }
+
+ @Override
+ public String getName() {
+ return NETCONF;
+ }
+
+ @Override
+ public RemoteNetconfCommand create() {
+ return new RemoteNetconfCommand(clientBootstrap, localAddress);
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.ssh;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.server.ExitCallback;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.AsyncSshHandlerReader;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.AsyncSshHandlerWriter;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Netty handler that reads SSH from remote client and writes to delegate server and reads from delegate server and writes to remote client
+ */
+final class SshProxyClientHandler extends ChannelInboundHandlerAdapter {
+
+ private static final Logger logger = LoggerFactory.getLogger(SshProxyClientHandler.class);
+
+ private final IoInputStream in;
+ private final IoOutputStream out;
+
+ private AsyncSshHandlerReader asyncSshHandlerReader;
+ private AsyncSshHandlerWriter asyncSshHandlerWriter;
+
+ private final NetconfHelloMessageAdditionalHeader netconfHelloMessageAdditionalHeader;
+ private final ExitCallback callback;
+
+ public SshProxyClientHandler(final IoInputStream in, final IoOutputStream out,
+ final NetconfHelloMessageAdditionalHeader netconfHelloMessageAdditionalHeader,
+ final ExitCallback callback) {
+ this.in = in;
+ this.out = out;
+ this.netconfHelloMessageAdditionalHeader = netconfHelloMessageAdditionalHeader;
+ this.callback = callback;
+ }
+
+ @Override
+ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
+ writeAdditionalHeader(ctx);
+
+ asyncSshHandlerWriter = new AsyncSshHandlerWriter(out);
+ asyncSshHandlerReader = new AsyncSshHandlerReader(new AutoCloseable() {
+ @Override
+ public void close() throws Exception {
+ // Close both sessions (delegate server and remote client)
+ ctx.fireChannelInactive();
+ ctx.disconnect();
+ ctx.close();
+ asyncSshHandlerReader.close();
+ asyncSshHandlerWriter.close();
+ }
+ }, new AsyncSshHandlerReader.ReadMsgHandler() {
+ @Override
+ public void onMessageRead(final ByteBuf msg) {
+ if(logger.isTraceEnabled()) {
+ logger.trace("Forwarding message for client: {} on channel: {}, message: {}",
+ netconfHelloMessageAdditionalHeader.getAddress(), ctx.channel(), AsyncSshHandlerWriter.byteBufToString(msg));
+ }
+ // Just forward to delegate
+ ctx.writeAndFlush(msg);
+ }
+ }, "ssh" + netconfHelloMessageAdditionalHeader.getAddress(), in);
+
+
+ super.channelActive(ctx);
+ }
+
+ private void writeAdditionalHeader(final ChannelHandlerContext ctx) {
+ ctx.writeAndFlush(Unpooled.copiedBuffer(netconfHelloMessageAdditionalHeader.toFormattedString().getBytes()));
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
+ asyncSshHandlerWriter.write(ctx, msg, ctx.newPromise());
+ }
+
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ logger.debug("Internal connection to netconf server was dropped for client: {} on channel: ",
+ netconfHelloMessageAdditionalHeader.getAddress(), ctx.channel());
+ callback.onExit(1, "Internal connection to netconf server was dropped for client: " +
+ netconfHelloMessageAdditionalHeader.getAddress() + " on channel: " + ctx.channel());
+ super.channelInactive(ctx);
+ }
+
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.ssh;
+
+import com.google.common.collect.Lists;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.local.LocalAddress;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.AsynchronousChannelGroup;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.sshd.SshServer;
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.KeyPairProvider;
+import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.RuntimeSshException;
+import org.apache.sshd.common.io.IoAcceptor;
+import org.apache.sshd.common.io.IoConnector;
+import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoServiceFactory;
+import org.apache.sshd.common.io.IoServiceFactoryFactory;
+import org.apache.sshd.common.io.nio2.Nio2Acceptor;
+import org.apache.sshd.common.io.nio2.Nio2Connector;
+import org.apache.sshd.common.io.nio2.Nio2ServiceFactoryFactory;
+import org.apache.sshd.common.util.CloseableUtils;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.PasswordAuthenticator;
+
+/**
+ * Proxy SSH server that just delegates decrypted content to a delegate server within same VM.
+ * Implemented using Apache Mina SSH lib.
+ */
+public class SshProxyServer implements AutoCloseable {
+
+ private final SshServer sshServer;
+ private final ScheduledExecutorService minaTimerExecutor;
+ private final EventLoopGroup clientGroup;
+ private final IoServiceFactoryFactory nioServiceWithPoolFactoryFactory;
+
+ public SshProxyServer(final ScheduledExecutorService minaTimerExecutor, final EventLoopGroup clientGroup, final ExecutorService nioExecutor) {
+ this.minaTimerExecutor = minaTimerExecutor;
+ this.clientGroup = clientGroup;
+ this.nioServiceWithPoolFactoryFactory = new NioServiceWithPoolFactory.NioServiceWithPoolFactoryFactory(nioExecutor);
+ this.sshServer = SshServer.setUpDefaultServer();
+ }
+
+ public void bind(final InetSocketAddress bindingAddress, final LocalAddress localAddress, final PasswordAuthenticator authenticator, final KeyPairProvider keyPairProvider) throws IOException {
+ sshServer.setHost(bindingAddress.getHostString());
+ sshServer.setPort(bindingAddress.getPort());
+
+ sshServer.setPasswordAuthenticator(authenticator);
+ sshServer.setKeyPairProvider(keyPairProvider);
+
+ sshServer.setIoServiceFactoryFactory(nioServiceWithPoolFactoryFactory);
+ sshServer.setScheduledExecutorService(minaTimerExecutor);
+
+ final RemoteNetconfCommand.NetconfCommandFactory netconfCommandFactory =
+ new RemoteNetconfCommand.NetconfCommandFactory(clientGroup, localAddress);
+ sshServer.setSubsystemFactories(Lists.<NamedFactory<Command>>newArrayList(netconfCommandFactory));
+ sshServer.start();
+ }
+
+ @Override
+ public void close() {
+ try {
+ sshServer.stop(true);
+ } catch (final InterruptedException e) {
+ throw new RuntimeException("Interrupted while stopping sshServer", e);
+ } finally {
+ sshServer.close(true);
+ }
+ }
+
+ /**
+ * Based on Nio2ServiceFactory with one addition: injectable executor
+ */
+ private static final class NioServiceWithPoolFactory extends CloseableUtils.AbstractCloseable implements IoServiceFactory {
+
+ private final FactoryManager manager;
+ private final AsynchronousChannelGroup group;
+
+ public NioServiceWithPoolFactory(final FactoryManager manager, final ExecutorService executor) {
+ this.manager = manager;
+ try {
+ group = AsynchronousChannelGroup.withThreadPool(executor);
+ } catch (final IOException e) {
+ throw new RuntimeSshException(e);
+ }
+ }
+
+ public IoConnector createConnector(final IoHandler handler) {
+ return new Nio2Connector(manager, handler, group);
+ }
+
+ public IoAcceptor createAcceptor(final IoHandler handler) {
+ return new Nio2Acceptor(manager, handler, group);
+ }
+
+ @Override
+ protected void doCloseImmediately() {
+ try {
+ group.shutdownNow();
+ group.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (final Exception e) {
+ log.debug("Exception caught while closing channel group", e);
+ } finally {
+ super.doCloseImmediately();
+ }
+ }
+
+ private static final class NioServiceWithPoolFactoryFactory extends Nio2ServiceFactoryFactory {
+
+ private final ExecutorService nioExecutor;
+
+ private NioServiceWithPoolFactoryFactory(final ExecutorService nioExecutor) {
+ this.nioExecutor = nioExecutor;
+ }
+
+ @Override
+ public IoServiceFactory create(final FactoryManager manager) {
+ return new NioServiceWithPoolFactory(manager, nioExecutor);
+ }
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.ssh.osgi;
+
+import com.google.common.base.Preconditions;
+import org.apache.sshd.server.PasswordAuthenticator;
+import org.apache.sshd.server.session.ServerSession;
+import org.opendaylight.controller.netconf.auth.AuthConstants;
+import org.opendaylight.controller.netconf.auth.AuthProvider;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class AuthProviderTracker implements ServiceTrackerCustomizer<AuthProvider, AuthProvider>, PasswordAuthenticator {
+ private static final Logger logger = LoggerFactory.getLogger(AuthProviderTracker.class);
+
+ private final BundleContext bundleContext;
+
+ private Integer maxPreference;
+ private final ServiceTracker<AuthProvider, AuthProvider> listenerTracker;
+ private AuthProvider authProvider;
+
+ public AuthProviderTracker(final BundleContext bundleContext) {
+ this.bundleContext = bundleContext;
+ listenerTracker = new ServiceTracker<>(bundleContext, AuthProvider.class, this);
+ listenerTracker.open();
+ }
+
+ @Override
+ public AuthProvider addingService(final ServiceReference<AuthProvider> reference) {
+ logger.trace("Service {} added", reference);
+ final AuthProvider authService = bundleContext.getService(reference);
+ final Integer newServicePreference = getPreference(reference);
+ if(isBetter(newServicePreference)) {
+ maxPreference = newServicePreference;
+ this.authProvider = authService;
+ }
+ return authService;
+ }
+
+ private Integer getPreference(final ServiceReference<AuthProvider> reference) {
+ final Object preferenceProperty = reference.getProperty(AuthConstants.SERVICE_PREFERENCE_KEY);
+ return preferenceProperty == null ? Integer.MIN_VALUE : Integer.valueOf(preferenceProperty.toString());
+ }
+
+ private boolean isBetter(final Integer newServicePreference) {
+ Preconditions.checkNotNull(newServicePreference);
+ if(maxPreference == null) {
+ return true;
+ }
+
+ return newServicePreference > maxPreference;
+ }
+
+ @Override
+ public void modifiedService(final ServiceReference<AuthProvider> reference, final AuthProvider service) {
+ final AuthProvider authService = bundleContext.getService(reference);
+ final Integer newServicePreference = getPreference(reference);
+ if(isBetter(newServicePreference)) {
+ logger.trace("Replacing modified service {} in netconf SSH.", reference);
+ this.authProvider = authService;
+ }
+ }
+
+ @Override
+ public void removedService(final ServiceReference<AuthProvider> reference, final AuthProvider service) {
+ logger.trace("Removing service {} from netconf SSH. " +
+ "SSH won't authenticate users until AuthProvider service will be started.", reference);
+ maxPreference = null;
+ this.authProvider = null;
+ }
+
+ public void stop() {
+ listenerTracker.close();
+ // sshThread should finish normally since sshServer.close stops processing
+ }
+
+ @Override
+ public boolean authenticate(final String username, final String password, final ServerSession session) {
+ return authProvider == null ? false : authProvider.authenticated(username, password);
+ }
+}
import static com.google.common.base.Preconditions.checkState;
-import com.google.common.base.Preconditions;
-import java.io.File;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import io.netty.channel.local.LocalAddress;
+import io.netty.channel.nio.NioEventLoopGroup;
import java.io.IOException;
import java.net.InetSocketAddress;
-
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import org.apache.commons.io.FilenameUtils;
-import org.opendaylight.controller.netconf.auth.AuthConstants;
-import org.opendaylight.controller.netconf.auth.AuthProvider;
-import org.opendaylight.controller.netconf.ssh.NetconfSSHServer;
-import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator;
+import org.apache.sshd.common.util.ThreadUtils;
+import org.apache.sshd.server.keyprovider.PEMGeneratorHostKeyProvider;
+import org.opendaylight.controller.netconf.ssh.SshProxyServer;
import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil;
import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil.InfixProp;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.local.LocalAddress;
-import io.netty.channel.nio.NioEventLoopGroup;
-
-/**
- * Activator for netconf SSH bundle which creates SSH bridge between netconf client and netconf server. Activator
- * starts SSH Server in its own thread. This thread is closed when activator calls stop() method. Server opens socket
- * and listens for client connections. Each client connection creation is handled in separate
- * {@link org.opendaylight.controller.netconf.ssh.threads.Handshaker} thread.
- * This thread creates two additional threads {@link org.opendaylight.controller.netconf.ssh.threads.IOThread}
- * forwarding data from/to client.IOThread closes servers session and server connection when it gets -1 on input stream.
- * {@link org.opendaylight.controller.netconf.ssh.threads.IOThread}'s run method waits for -1 on input stream to finish.
- * All threads are daemons.
- */
public class NetconfSSHActivator implements BundleActivator {
private static final Logger logger = LoggerFactory.getLogger(NetconfSSHActivator.class);
- private static AuthProviderTracker authProviderTracker;
- private NetconfSSHServer server;
+ private static final java.lang.String ALGORITHM = "RSA";
+ private static final int KEY_SIZE = 4096;
+ public static final int POOL_SIZE = 8;
+
+ private ScheduledExecutorService minaTimerExecutor;
+ private NioEventLoopGroup clientGroup;
+ private ExecutorService nioExecutor;
+ private AuthProviderTracker authProviderTracker;
+
+ private SshProxyServer server;
@Override
public void start(final BundleContext bundleContext) throws IOException {
+ minaTimerExecutor = Executors.newScheduledThreadPool(POOL_SIZE, new ThreadFactory() {
+ @Override
+ public Thread newThread(final Runnable r) {
+ return new Thread(r, "netconf-ssh-server-mina-timers");
+ }
+ });
+ clientGroup = new NioEventLoopGroup();
+ nioExecutor = ThreadUtils.newFixedThreadPool("netconf-ssh-server-nio-group", POOL_SIZE);
server = startSSHServer(bundleContext);
}
if(authProviderTracker != null) {
authProviderTracker.stop();
}
+
+ if(nioExecutor!=null) {
+ nioExecutor.shutdownNow();
+ }
+
+ if(clientGroup != null) {
+ clientGroup.shutdownGracefully();
+ }
+
+ if(minaTimerExecutor != null) {
+ minaTimerExecutor.shutdownNow();
+ }
}
- private static NetconfSSHServer startSSHServer(final BundleContext bundleContext) throws IOException {
- final Optional<InetSocketAddress> maybeSshSocketAddress = NetconfConfigUtil.extractNetconfServerAddress(bundleContext,
- InfixProp.ssh);
+ private SshProxyServer startSSHServer(final BundleContext bundleContext) throws IOException {
+ final Optional<InetSocketAddress> maybeSshSocketAddress = NetconfConfigUtil.extractNetconfServerAddress(bundleContext, InfixProp.ssh);
if (maybeSshSocketAddress.isPresent() == false) {
logger.trace("SSH bridge not configured");
final LocalAddress localAddress = NetconfConfigUtil.getNetconfLocalAddress();
- final String path = FilenameUtils.separatorsToSystem(NetconfConfigUtil.getPrivateKeyPath(bundleContext));
- checkState(!Strings.isNullOrEmpty(path), "Path to ssh private key is blank. Reconfigure %s", NetconfConfigUtil.getPrivateKeyKey());
- final String privateKeyPEMString = PEMGenerator.readOrGeneratePK(new File(path));
-
- final EventLoopGroup bossGroup = new NioEventLoopGroup();
- final NetconfSSHServer server = NetconfSSHServer.start(sshSocketAddress.getPort(), localAddress, bossGroup, privateKeyPEMString.toCharArray());
-
- authProviderTracker = new AuthProviderTracker(bundleContext, server);
+ authProviderTracker = new AuthProviderTracker(bundleContext);
- return server;
- }
+ final String path = FilenameUtils.separatorsToSystem(NetconfConfigUtil.getPrivateKeyPath(bundleContext));
+ checkState(!Strings.isNullOrEmpty(path), "Path to ssh private key is blank. Reconfigure %s",
+ NetconfConfigUtil.getPrivateKeyKey());
- private static Thread runNetconfSshThread(final NetconfSSHServer server) {
- final Thread serverThread = new Thread(server, "netconf SSH server thread");
- serverThread.setDaemon(true);
- serverThread.start();
- logger.trace("Netconf SSH bridge up and running.");
- return serverThread;
+ final SshProxyServer sshProxyServer = new SshProxyServer(minaTimerExecutor, clientGroup, nioExecutor);
+ sshProxyServer.bind(sshSocketAddress, localAddress, authProviderTracker, new PEMGeneratorHostKeyProvider(path, ALGORITHM, KEY_SIZE));
+ return sshProxyServer;
}
- private static class AuthProviderTracker implements ServiceTrackerCustomizer<AuthProvider, AuthProvider> {
- private final BundleContext bundleContext;
- private final NetconfSSHServer server;
-
- private Integer maxPreference;
- private Thread sshThread;
- private final ServiceTracker<AuthProvider, AuthProvider> listenerTracker;
-
- public AuthProviderTracker(final BundleContext bundleContext, final NetconfSSHServer server) {
- this.bundleContext = bundleContext;
- this.server = server;
- listenerTracker = new ServiceTracker<>(bundleContext, AuthProvider.class, this);
- listenerTracker.open();
- }
-
- @Override
- public AuthProvider addingService(final ServiceReference<AuthProvider> reference) {
- logger.trace("Service {} added", reference);
- final AuthProvider authService = bundleContext.getService(reference);
- final Integer newServicePreference = getPreference(reference);
- if(isBetter(newServicePreference)) {
- maxPreference = newServicePreference;
- server.setAuthProvider(authService);
- if(sshThread == null) {
- sshThread = runNetconfSshThread(server);
- }
- }
- return authService;
- }
-
- private Integer getPreference(final ServiceReference<AuthProvider> reference) {
- final Object preferenceProperty = reference.getProperty(AuthConstants.SERVICE_PREFERENCE_KEY);
- return preferenceProperty == null ? Integer.MIN_VALUE : Integer.valueOf(preferenceProperty.toString());
- }
-
- private boolean isBetter(final Integer newServicePreference) {
- Preconditions.checkNotNull(newServicePreference);
- if(maxPreference == null) {
- return true;
- }
-
- return newServicePreference > maxPreference;
- }
-
- @Override
- public void modifiedService(final ServiceReference<AuthProvider> reference, final AuthProvider service) {
- final AuthProvider authService = bundleContext.getService(reference);
- final Integer newServicePreference = getPreference(reference);
- if(isBetter(newServicePreference)) {
- logger.trace("Replacing modified service {} in netconf SSH.", reference);
- server.setAuthProvider(authService);
- }
- }
-
- @Override
- public void removedService(final ServiceReference<AuthProvider> reference, final AuthProvider service) {
- logger.trace("Removing service {} from netconf SSH. " +
- "SSH won't authenticate users until AuthProvider service will be started.", reference);
- maxPreference = null;
- server.setAuthProvider(null);
- }
-
- public void stop() {
- listenerTracker.close();
- // sshThread should finish normally since sshServer.close stops processing
- }
-
- }
}