From: Tony Tkacik Date: Mon, 27 Oct 2014 09:17:14 +0000 (+0000) Subject: Merge "BUG-1612 Implement mina ssh netconf server endpoint" X-Git-Tag: release/lithium~976 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=96171122685765f15a6faf0cc6f919221224870c;hp=8600573a11d7a0fa1c03e6c3f014473096e6a97c Merge "BUG-1612 Implement mina ssh netconf server endpoint" --- diff --git a/features/netconf-connector/pom.xml b/features/netconf-connector/pom.xml index 03d6fed605..b44fa11657 100644 --- a/features/netconf-connector/pom.xml +++ b/features/netconf-connector/pom.xml @@ -162,12 +162,13 @@ Optional TODO: Remove TODO comments. --> - - org.opendaylight.yangtools - features-test - ${yangtools.version} - test - + + + + + + + org.opendaylight.controller diff --git a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java index 1adcd7e491..f96f557619 100644 --- a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java +++ b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java @@ -21,14 +21,23 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalAddress; +import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.file.Files; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.sshd.server.PasswordAuthenticator; +import org.apache.sshd.server.keyprovider.PEMGeneratorHostKeyProvider; +import org.apache.sshd.server.session.ServerSession; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -40,13 +49,12 @@ import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl; import org.opendaylight.controller.netconf.client.NetconfClientSessionListener; import org.opendaylight.controller.netconf.client.SimpleNetconfClientSessionListener; +import org.opendaylight.controller.netconf.client.TestingNetconfClient; import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration; import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder; -import org.opendaylight.controller.netconf.client.TestingNetconfClient; import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler; import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.LoginPassword; -import org.opendaylight.controller.netconf.ssh.NetconfSSHServer; -import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator; +import org.opendaylight.controller.netconf.ssh.SshProxyServer; import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil; import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil; import org.opendaylight.controller.netconf.util.xml.XmlUtil; @@ -68,19 +76,32 @@ public class NetconfITSecureTest extends AbstractNetconfConfigTest { public static final String USERNAME = "user"; public static final String PASSWORD = "pwd"; - private NetconfSSHServer sshServer; + private SshProxyServer sshProxyServer; + + private ExecutorService nioExec; + private EventLoopGroup clientGroup; + private ScheduledExecutorService minaTimerEx; @Before public void setUp() throws Exception { - final char[] pem = PEMGenerator.generate().toCharArray(); - sshServer = NetconfSSHServer.start(TLS_ADDRESS.getPort(), NetconfConfigUtil.getNetconfLocalAddress(), getNettyThreadgroup(), pem); - sshServer.setAuthProvider(getAuthProvider()); + nioExec = Executors.newFixedThreadPool(1); + clientGroup = new NioEventLoopGroup(); + minaTimerEx = Executors.newScheduledThreadPool(1); + sshProxyServer = new SshProxyServer(minaTimerEx, clientGroup, nioExec); + sshProxyServer.bind(TLS_ADDRESS, NetconfConfigUtil.getNetconfLocalAddress(), new PasswordAuthenticator() { + @Override + public boolean authenticate(final String username, final String password, final ServerSession session) { + return true; + } + }, new PEMGeneratorHostKeyProvider(Files.createTempFile("prefix", "suffix").toAbsolutePath().toString())); } @After public void tearDown() throws Exception { - sshServer.close(); - sshServer.join(); + sshProxyServer.close(); + clientGroup.shutdownGracefully().await(); + minaTimerEx.shutdownNow(); + nioExec.shutdownNow(); } @Test diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java index 3bd7232023..fa7d0900ed 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java @@ -8,12 +8,9 @@ package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client; -import com.google.common.base.Preconditions; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandlerAdapter; -import io.netty.channel.ChannelPromise; import java.io.IOException; import java.net.SocketAddress; + import org.apache.sshd.ClientChannel; import org.apache.sshd.ClientSession; import org.apache.sshd.SshClient; @@ -26,6 +23,13 @@ import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication. import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; + /** * Netty SSH handler class. Acts as interface between Netty and SSH library. */ @@ -47,7 +51,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private final AuthenticationHandler authenticationHandler; private final SshClient sshClient; - private AsyncSshHanderReader sshReadAsyncListener; + private AsyncSshHandlerReader sshReadAsyncListener; private AsyncSshHandlerWriter sshWriteAsyncHandler; private ClientChannel channel; @@ -138,7 +142,20 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { connectPromise.setSuccess(); connectPromise = null; - sshReadAsyncListener = new AsyncSshHanderReader(this, ctx, channel.getAsyncOut()); + // TODO we should also read from error stream and at least log from that + + sshReadAsyncListener = new AsyncSshHandlerReader(new AutoCloseable() { + @Override + public void close() throws Exception { + AsyncSshHandler.this.disconnect(ctx, ctx.newPromise()); + } + }, new AsyncSshHandlerReader.ReadMsgHandler() { + @Override + public void onMessageRead(final ByteBuf msg) { + ctx.fireChannelRead(msg); + } + }, channel.toString(), channel.getAsyncOut()); + // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null if(channel != null) { sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn()); diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerReader.java similarity index 66% rename from opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java rename to opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerReader.java index 73a24f27b2..ada15583cd 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerReader.java @@ -8,9 +8,8 @@ package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandler; import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.io.IoInputStream; import org.apache.sshd.common.io.IoReadFuture; @@ -22,22 +21,24 @@ import org.slf4j.LoggerFactory; * Listener on async input stream from SSH session. * This listeners schedules reads in a loop until the session is closed or read fails. */ -final class AsyncSshHanderReader implements SshFutureListener, AutoCloseable { +public final class AsyncSshHandlerReader implements SshFutureListener, AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class); private static final int BUFFER_SIZE = 8192; - private final ChannelOutboundHandler asyncSshHandler; - private final ChannelHandlerContext ctx; + private final AutoCloseable connectionClosedCallback; + private final ReadMsgHandler readHandler; + private final String channelId; private IoInputStream asyncOut; private Buffer buf; private IoReadFuture currentReadFuture; - public AsyncSshHanderReader(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) { - this.asyncSshHandler = asyncSshHandler; - this.ctx = ctx; + public AsyncSshHandlerReader(final AutoCloseable connectionClosedCallback, final ReadMsgHandler readHandler, final String channelId, final IoInputStream asyncOut) { + this.connectionClosedCallback = connectionClosedCallback; + this.readHandler = readHandler; + this.channelId = channelId; this.asyncOut = asyncOut; buf = new Buffer(BUFFER_SIZE); asyncOut.read(buf).addListener(this); @@ -48,16 +49,20 @@ final class AsyncSshHanderReader implements SshFutureListener, Aut if(future.getException() != null) { if(asyncOut.isClosed() || asyncOut.isClosing()) { // Ssh dropped - logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException()); + logger.debug("Ssh session dropped on channel: {}", channelId, future.getException()); } else { - logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException()); + logger.warn("Exception while reading from SSH remote on channel {}", channelId, future.getException()); } invokeDisconnect(); return; } if (future.getRead() > 0) { - ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead())); + final ByteBuf msg = Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()); + if(logger.isTraceEnabled()) { + logger.trace("Reading message on channel: {}, message: {}", channelId, AsyncSshHandlerWriter.byteBufToString(msg)); + } + readHandler.onMessageRead(msg); // Schedule next read buf = new Buffer(BUFFER_SIZE); @@ -68,7 +73,7 @@ final class AsyncSshHanderReader implements SshFutureListener, Aut private void invokeDisconnect() { try { - asyncSshHandler.disconnect(ctx, ctx.newPromise()); + connectionClosedCallback.close(); } catch (final Exception e) { // This should not happen throw new IllegalStateException(e); @@ -80,8 +85,14 @@ final class AsyncSshHanderReader implements SshFutureListener, Aut // Remove self as listener on close to prevent reading from closed input if(currentReadFuture != null) { currentReadFuture.removeListener(this); + currentReadFuture = null; } asyncOut = null; } + + public interface ReadMsgHandler { + + void onMessageRead(ByteBuf msg); + } } diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java index eace0ac7ea..8e639bd47c 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java @@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory; * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server. * Also handles pending writes by caching requests until pending state is over. */ -final class AsyncSshHandlerWriter implements AutoCloseable { +public final class AsyncSshHandlerWriter implements AutoCloseable { private static final Logger logger = LoggerFactory .getLogger(AsyncSshHandlerWriter.class); @@ -116,7 +116,7 @@ final class AsyncSshHandlerWriter implements AutoCloseable { writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg); } - private static String byteBufToString(final ByteBuf msg) { + public static String byteBufToString(final ByteBuf msg) { msg.resetReaderIndex(); final String s = msg.toString(Charsets.UTF_8); msg.resetReaderIndex(); diff --git a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java index d0fc43d04a..212eabb290 100644 --- a/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java +++ b/opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java @@ -459,6 +459,8 @@ public class AsyncSshHandlerTest { private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut, final IoOutputStream asyncIn) throws IOException { final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class); + doReturn("subsystemChannel").when(subsystemChannel).toString(); + doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class)); final OpenFuture openFuture = mock(OpenFuture.class); diff --git a/opendaylight/netconf/netconf-ssh/pom.xml b/opendaylight/netconf/netconf-ssh/pom.xml index 221626b741..78453b1770 100644 --- a/opendaylight/netconf/netconf-ssh/pom.xml +++ b/opendaylight/netconf/netconf-ssh/pom.xml @@ -60,7 +60,6 @@ org.opendaylight.controller netconf-netty-util - test org.opendaylight.controller diff --git a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/RemoteNetconfCommand.java b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/RemoteNetconfCommand.java new file mode 100644 index 0000000000..e642e073a3 --- /dev/null +++ b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/RemoteNetconfCommand.java @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.ssh; + +import com.google.common.base.Preconditions; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalChannel; +import io.netty.util.concurrent.GenericFutureListener; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import org.apache.sshd.common.NamedFactory; +import org.apache.sshd.common.io.IoInputStream; +import org.apache.sshd.common.io.IoOutputStream; +import org.apache.sshd.server.AsyncCommand; +import org.apache.sshd.server.Command; +import org.apache.sshd.server.Environment; +import org.apache.sshd.server.ExitCallback; +import org.apache.sshd.server.SessionAware; +import org.apache.sshd.server.session.ServerSession; +import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This command handles all netconf related rpc and forwards to delegate server. + * Uses netty to make a local connection to delegate server. + * + * Command is Apache Mina SSH terminology for objects handling ssh data. + */ +public class RemoteNetconfCommand implements AsyncCommand, SessionAware { + + private static final Logger logger = LoggerFactory.getLogger(RemoteNetconfCommand.class); + + private final EventLoopGroup clientEventGroup; + private final LocalAddress localAddress; + + private IoInputStream in; + private IoOutputStream out; + private ExitCallback callback; + private NetconfHelloMessageAdditionalHeader netconfHelloMessageAdditionalHeader; + + private Channel clientChannel; + private ChannelFuture clientChannelFuture; + + public RemoteNetconfCommand(final EventLoopGroup clientEventGroup, final LocalAddress localAddress) { + this.clientEventGroup = clientEventGroup; + this.localAddress = localAddress; + } + + @Override + public void setIoInputStream(final IoInputStream in) { + this.in = in; + } + + @Override + public void setIoOutputStream(final IoOutputStream out) { + this.out = out; + } + + @Override + public void setIoErrorStream(final IoOutputStream err) { + // TODO do we want to use error stream in some way ? + } + + @Override + public void setInputStream(final InputStream in) { + throw new UnsupportedOperationException("Synchronous IO is unsupported"); + } + + @Override + public void setOutputStream(final OutputStream out) { + throw new UnsupportedOperationException("Synchronous IO is unsupported"); + + } + + @Override + public void setErrorStream(final OutputStream err) { + throw new UnsupportedOperationException("Synchronous IO is unsupported"); + + } + + @Override + public void setExitCallback(final ExitCallback callback) { + this.callback = callback; + } + + @Override + public void start(final Environment env) throws IOException { + logger.trace("Establishing internal connection to netconf server for client: {}", getClientAddress()); + + final Bootstrap clientBootstrap = new Bootstrap(); + clientBootstrap.group(clientEventGroup).channel(LocalChannel.class); + + clientBootstrap + .handler(new ChannelInitializer() { + @Override + public void initChannel(final LocalChannel ch) throws Exception { + ch.pipeline().addLast(new SshProxyClientHandler(in, out, netconfHelloMessageAdditionalHeader, callback)); + } + }); + clientChannelFuture = clientBootstrap.connect(localAddress); + clientChannelFuture.addListener(new GenericFutureListener() { + + @Override + public void operationComplete(final ChannelFuture future) throws Exception { + if(future.isSuccess()) { + clientChannel = clientChannelFuture.channel(); + } else { + logger.warn("Unable to establish internal connection to netconf server for client: {}", getClientAddress()); + Preconditions.checkNotNull(callback, "Exit callback must be set"); + callback.onExit(1, "Unable to establish internal connection to netconf server for client: "+ getClientAddress()); + } + } + }); + } + + @Override + public void destroy() { + logger.trace("Releasing internal connection to netconf server for client: {} on channel: {}", + getClientAddress(), clientChannel); + + clientChannelFuture.cancel(true); + if(clientChannel != null) { + clientChannel.close().addListener(new GenericFutureListener() { + + @Override + public void operationComplete(final ChannelFuture future) throws Exception { + if (future.isSuccess() == false) { + logger.warn("Unable to release internal connection to netconf server on channel: {}", clientChannel); + } + } + }); + } + } + + private String getClientAddress() { + return netconfHelloMessageAdditionalHeader.getAddress(); + } + + @Override + public void setSession(final ServerSession session) { + final SocketAddress remoteAddress = session.getIoSession().getRemoteAddress(); + String hostName = ""; + String port = ""; + if(remoteAddress instanceof InetSocketAddress) { + hostName = ((InetSocketAddress) remoteAddress).getAddress().getHostAddress(); + port = Integer.toString(((InetSocketAddress) remoteAddress).getPort()); + } + netconfHelloMessageAdditionalHeader = new NetconfHelloMessageAdditionalHeader( + session.getUsername(), hostName, port, "ssh", "client"); + } + + public static class NetconfCommandFactory implements NamedFactory { + + public static final String NETCONF = "netconf"; + + private final EventLoopGroup clientBootstrap; + private final LocalAddress localAddress; + + public NetconfCommandFactory(final EventLoopGroup clientBootstrap, final LocalAddress localAddress) { + + this.clientBootstrap = clientBootstrap; + this.localAddress = localAddress; + } + + @Override + public String getName() { + return NETCONF; + } + + @Override + public RemoteNetconfCommand create() { + return new RemoteNetconfCommand(clientBootstrap, localAddress); + } + } + +} diff --git a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/SshProxyClientHandler.java b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/SshProxyClientHandler.java new file mode 100644 index 0000000000..2b2b3b3e81 --- /dev/null +++ b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/SshProxyClientHandler.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.ssh; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.sshd.common.io.IoInputStream; +import org.apache.sshd.common.io.IoOutputStream; +import org.apache.sshd.server.ExitCallback; +import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.AsyncSshHandlerReader; +import org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.AsyncSshHandlerWriter; +import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Netty handler that reads SSH from remote client and writes to delegate server and reads from delegate server and writes to remote client + */ +final class SshProxyClientHandler extends ChannelInboundHandlerAdapter { + + private static final Logger logger = LoggerFactory.getLogger(SshProxyClientHandler.class); + + private final IoInputStream in; + private final IoOutputStream out; + + private AsyncSshHandlerReader asyncSshHandlerReader; + private AsyncSshHandlerWriter asyncSshHandlerWriter; + + private final NetconfHelloMessageAdditionalHeader netconfHelloMessageAdditionalHeader; + private final ExitCallback callback; + + public SshProxyClientHandler(final IoInputStream in, final IoOutputStream out, + final NetconfHelloMessageAdditionalHeader netconfHelloMessageAdditionalHeader, + final ExitCallback callback) { + this.in = in; + this.out = out; + this.netconfHelloMessageAdditionalHeader = netconfHelloMessageAdditionalHeader; + this.callback = callback; + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + writeAdditionalHeader(ctx); + + asyncSshHandlerWriter = new AsyncSshHandlerWriter(out); + asyncSshHandlerReader = new AsyncSshHandlerReader(new AutoCloseable() { + @Override + public void close() throws Exception { + // Close both sessions (delegate server and remote client) + ctx.fireChannelInactive(); + ctx.disconnect(); + ctx.close(); + asyncSshHandlerReader.close(); + asyncSshHandlerWriter.close(); + } + }, new AsyncSshHandlerReader.ReadMsgHandler() { + @Override + public void onMessageRead(final ByteBuf msg) { + if(logger.isTraceEnabled()) { + logger.trace("Forwarding message for client: {} on channel: {}, message: {}", + netconfHelloMessageAdditionalHeader.getAddress(), ctx.channel(), AsyncSshHandlerWriter.byteBufToString(msg)); + } + // Just forward to delegate + ctx.writeAndFlush(msg); + } + }, "ssh" + netconfHelloMessageAdditionalHeader.getAddress(), in); + + + super.channelActive(ctx); + } + + private void writeAdditionalHeader(final ChannelHandlerContext ctx) { + ctx.writeAndFlush(Unpooled.copiedBuffer(netconfHelloMessageAdditionalHeader.toFormattedString().getBytes())); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { + asyncSshHandlerWriter.write(ctx, msg, ctx.newPromise()); + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + logger.debug("Internal connection to netconf server was dropped for client: {} on channel: ", + netconfHelloMessageAdditionalHeader.getAddress(), ctx.channel()); + callback.onExit(1, "Internal connection to netconf server was dropped for client: " + + netconfHelloMessageAdditionalHeader.getAddress() + " on channel: " + ctx.channel()); + super.channelInactive(ctx); + } + + +} diff --git a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/SshProxyServer.java b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/SshProxyServer.java new file mode 100644 index 0000000000..0b85cf2653 --- /dev/null +++ b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/SshProxyServer.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.ssh; + +import com.google.common.collect.Lists; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.local.LocalAddress; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.AsynchronousChannelGroup; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.sshd.SshServer; +import org.apache.sshd.common.FactoryManager; +import org.apache.sshd.common.KeyPairProvider; +import org.apache.sshd.common.NamedFactory; +import org.apache.sshd.common.RuntimeSshException; +import org.apache.sshd.common.io.IoAcceptor; +import org.apache.sshd.common.io.IoConnector; +import org.apache.sshd.common.io.IoHandler; +import org.apache.sshd.common.io.IoServiceFactory; +import org.apache.sshd.common.io.IoServiceFactoryFactory; +import org.apache.sshd.common.io.nio2.Nio2Acceptor; +import org.apache.sshd.common.io.nio2.Nio2Connector; +import org.apache.sshd.common.io.nio2.Nio2ServiceFactoryFactory; +import org.apache.sshd.common.util.CloseableUtils; +import org.apache.sshd.server.Command; +import org.apache.sshd.server.PasswordAuthenticator; + +/** + * Proxy SSH server that just delegates decrypted content to a delegate server within same VM. + * Implemented using Apache Mina SSH lib. + */ +public class SshProxyServer implements AutoCloseable { + + private final SshServer sshServer; + private final ScheduledExecutorService minaTimerExecutor; + private final EventLoopGroup clientGroup; + private final IoServiceFactoryFactory nioServiceWithPoolFactoryFactory; + + public SshProxyServer(final ScheduledExecutorService minaTimerExecutor, final EventLoopGroup clientGroup, final ExecutorService nioExecutor) { + this.minaTimerExecutor = minaTimerExecutor; + this.clientGroup = clientGroup; + this.nioServiceWithPoolFactoryFactory = new NioServiceWithPoolFactory.NioServiceWithPoolFactoryFactory(nioExecutor); + this.sshServer = SshServer.setUpDefaultServer(); + } + + public void bind(final InetSocketAddress bindingAddress, final LocalAddress localAddress, final PasswordAuthenticator authenticator, final KeyPairProvider keyPairProvider) throws IOException { + sshServer.setHost(bindingAddress.getHostString()); + sshServer.setPort(bindingAddress.getPort()); + + sshServer.setPasswordAuthenticator(authenticator); + sshServer.setKeyPairProvider(keyPairProvider); + + sshServer.setIoServiceFactoryFactory(nioServiceWithPoolFactoryFactory); + sshServer.setScheduledExecutorService(minaTimerExecutor); + + final RemoteNetconfCommand.NetconfCommandFactory netconfCommandFactory = + new RemoteNetconfCommand.NetconfCommandFactory(clientGroup, localAddress); + sshServer.setSubsystemFactories(Lists.>newArrayList(netconfCommandFactory)); + sshServer.start(); + } + + @Override + public void close() { + try { + sshServer.stop(true); + } catch (final InterruptedException e) { + throw new RuntimeException("Interrupted while stopping sshServer", e); + } finally { + sshServer.close(true); + } + } + + /** + * Based on Nio2ServiceFactory with one addition: injectable executor + */ + private static final class NioServiceWithPoolFactory extends CloseableUtils.AbstractCloseable implements IoServiceFactory { + + private final FactoryManager manager; + private final AsynchronousChannelGroup group; + + public NioServiceWithPoolFactory(final FactoryManager manager, final ExecutorService executor) { + this.manager = manager; + try { + group = AsynchronousChannelGroup.withThreadPool(executor); + } catch (final IOException e) { + throw new RuntimeSshException(e); + } + } + + public IoConnector createConnector(final IoHandler handler) { + return new Nio2Connector(manager, handler, group); + } + + public IoAcceptor createAcceptor(final IoHandler handler) { + return new Nio2Acceptor(manager, handler, group); + } + + @Override + protected void doCloseImmediately() { + try { + group.shutdownNow(); + group.awaitTermination(5, TimeUnit.SECONDS); + } catch (final Exception e) { + log.debug("Exception caught while closing channel group", e); + } finally { + super.doCloseImmediately(); + } + } + + private static final class NioServiceWithPoolFactoryFactory extends Nio2ServiceFactoryFactory { + + private final ExecutorService nioExecutor; + + private NioServiceWithPoolFactoryFactory(final ExecutorService nioExecutor) { + this.nioExecutor = nioExecutor; + } + + @Override + public IoServiceFactory create(final FactoryManager manager) { + return new NioServiceWithPoolFactory(manager, nioExecutor); + } + } + } + +} diff --git a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/osgi/AuthProviderTracker.java b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/osgi/AuthProviderTracker.java new file mode 100644 index 0000000000..97e611c0d2 --- /dev/null +++ b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/osgi/AuthProviderTracker.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.ssh.osgi; + +import com.google.common.base.Preconditions; +import org.apache.sshd.server.PasswordAuthenticator; +import org.apache.sshd.server.session.ServerSession; +import org.opendaylight.controller.netconf.auth.AuthConstants; +import org.opendaylight.controller.netconf.auth.AuthProvider; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.osgi.util.tracker.ServiceTracker; +import org.osgi.util.tracker.ServiceTrackerCustomizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class AuthProviderTracker implements ServiceTrackerCustomizer, PasswordAuthenticator { + private static final Logger logger = LoggerFactory.getLogger(AuthProviderTracker.class); + + private final BundleContext bundleContext; + + private Integer maxPreference; + private final ServiceTracker listenerTracker; + private AuthProvider authProvider; + + public AuthProviderTracker(final BundleContext bundleContext) { + this.bundleContext = bundleContext; + listenerTracker = new ServiceTracker<>(bundleContext, AuthProvider.class, this); + listenerTracker.open(); + } + + @Override + public AuthProvider addingService(final ServiceReference reference) { + logger.trace("Service {} added", reference); + final AuthProvider authService = bundleContext.getService(reference); + final Integer newServicePreference = getPreference(reference); + if(isBetter(newServicePreference)) { + maxPreference = newServicePreference; + this.authProvider = authService; + } + return authService; + } + + private Integer getPreference(final ServiceReference reference) { + final Object preferenceProperty = reference.getProperty(AuthConstants.SERVICE_PREFERENCE_KEY); + return preferenceProperty == null ? Integer.MIN_VALUE : Integer.valueOf(preferenceProperty.toString()); + } + + private boolean isBetter(final Integer newServicePreference) { + Preconditions.checkNotNull(newServicePreference); + if(maxPreference == null) { + return true; + } + + return newServicePreference > maxPreference; + } + + @Override + public void modifiedService(final ServiceReference reference, final AuthProvider service) { + final AuthProvider authService = bundleContext.getService(reference); + final Integer newServicePreference = getPreference(reference); + if(isBetter(newServicePreference)) { + logger.trace("Replacing modified service {} in netconf SSH.", reference); + this.authProvider = authService; + } + } + + @Override + public void removedService(final ServiceReference reference, final AuthProvider service) { + logger.trace("Removing service {} from netconf SSH. " + + "SSH won't authenticate users until AuthProvider service will be started.", reference); + maxPreference = null; + this.authProvider = null; + } + + public void stop() { + listenerTracker.close(); + // sshThread should finish normally since sshServer.close stops processing + } + + @Override + public boolean authenticate(final String username, final String password, final ServerSession session) { + return authProvider == null ? false : authProvider.authenticated(username, password); + } +} diff --git a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/osgi/NetconfSSHActivator.java b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/osgi/NetconfSSHActivator.java index 0d0f95c3cb..b871d19db8 100644 --- a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/osgi/NetconfSSHActivator.java +++ b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/osgi/NetconfSSHActivator.java @@ -9,51 +9,51 @@ package org.opendaylight.controller.netconf.ssh.osgi; import static com.google.common.base.Preconditions.checkState; -import com.google.common.base.Preconditions; -import java.io.File; +import com.google.common.base.Optional; +import com.google.common.base.Strings; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.nio.NioEventLoopGroup; import java.io.IOException; import java.net.InetSocketAddress; - +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import org.apache.commons.io.FilenameUtils; -import org.opendaylight.controller.netconf.auth.AuthConstants; -import org.opendaylight.controller.netconf.auth.AuthProvider; -import org.opendaylight.controller.netconf.ssh.NetconfSSHServer; -import org.opendaylight.controller.netconf.ssh.authentication.PEMGenerator; +import org.apache.sshd.common.util.ThreadUtils; +import org.apache.sshd.server.keyprovider.PEMGeneratorHostKeyProvider; +import org.opendaylight.controller.netconf.ssh.SshProxyServer; import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil; import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil.InfixProp; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceReference; -import org.osgi.util.tracker.ServiceTracker; -import org.osgi.util.tracker.ServiceTrackerCustomizer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Optional; -import com.google.common.base.Strings; - -import io.netty.channel.EventLoopGroup; -import io.netty.channel.local.LocalAddress; -import io.netty.channel.nio.NioEventLoopGroup; - -/** - * Activator for netconf SSH bundle which creates SSH bridge between netconf client and netconf server. Activator - * starts SSH Server in its own thread. This thread is closed when activator calls stop() method. Server opens socket - * and listens for client connections. Each client connection creation is handled in separate - * {@link org.opendaylight.controller.netconf.ssh.threads.Handshaker} thread. - * This thread creates two additional threads {@link org.opendaylight.controller.netconf.ssh.threads.IOThread} - * forwarding data from/to client.IOThread closes servers session and server connection when it gets -1 on input stream. - * {@link org.opendaylight.controller.netconf.ssh.threads.IOThread}'s run method waits for -1 on input stream to finish. - * All threads are daemons. - */ public class NetconfSSHActivator implements BundleActivator { private static final Logger logger = LoggerFactory.getLogger(NetconfSSHActivator.class); - private static AuthProviderTracker authProviderTracker; - private NetconfSSHServer server; + private static final java.lang.String ALGORITHM = "RSA"; + private static final int KEY_SIZE = 4096; + public static final int POOL_SIZE = 8; + + private ScheduledExecutorService minaTimerExecutor; + private NioEventLoopGroup clientGroup; + private ExecutorService nioExecutor; + private AuthProviderTracker authProviderTracker; + + private SshProxyServer server; @Override public void start(final BundleContext bundleContext) throws IOException { + minaTimerExecutor = Executors.newScheduledThreadPool(POOL_SIZE, new ThreadFactory() { + @Override + public Thread newThread(final Runnable r) { + return new Thread(r, "netconf-ssh-server-mina-timers"); + } + }); + clientGroup = new NioEventLoopGroup(); + nioExecutor = ThreadUtils.newFixedThreadPool("netconf-ssh-server-nio-group", POOL_SIZE); server = startSSHServer(bundleContext); } @@ -66,11 +66,22 @@ public class NetconfSSHActivator implements BundleActivator { if(authProviderTracker != null) { authProviderTracker.stop(); } + + if(nioExecutor!=null) { + nioExecutor.shutdownNow(); + } + + if(clientGroup != null) { + clientGroup.shutdownGracefully(); + } + + if(minaTimerExecutor != null) { + minaTimerExecutor.shutdownNow(); + } } - private static NetconfSSHServer startSSHServer(final BundleContext bundleContext) throws IOException { - final Optional maybeSshSocketAddress = NetconfConfigUtil.extractNetconfServerAddress(bundleContext, - InfixProp.ssh); + private SshProxyServer startSSHServer(final BundleContext bundleContext) throws IOException { + final Optional maybeSshSocketAddress = NetconfConfigUtil.extractNetconfServerAddress(bundleContext, InfixProp.ssh); if (maybeSshSocketAddress.isPresent() == false) { logger.trace("SSH bridge not configured"); @@ -82,92 +93,15 @@ public class NetconfSSHActivator implements BundleActivator { final LocalAddress localAddress = NetconfConfigUtil.getNetconfLocalAddress(); - final String path = FilenameUtils.separatorsToSystem(NetconfConfigUtil.getPrivateKeyPath(bundleContext)); - checkState(!Strings.isNullOrEmpty(path), "Path to ssh private key is blank. Reconfigure %s", NetconfConfigUtil.getPrivateKeyKey()); - final String privateKeyPEMString = PEMGenerator.readOrGeneratePK(new File(path)); - - final EventLoopGroup bossGroup = new NioEventLoopGroup(); - final NetconfSSHServer server = NetconfSSHServer.start(sshSocketAddress.getPort(), localAddress, bossGroup, privateKeyPEMString.toCharArray()); - - authProviderTracker = new AuthProviderTracker(bundleContext, server); + authProviderTracker = new AuthProviderTracker(bundleContext); - return server; - } + final String path = FilenameUtils.separatorsToSystem(NetconfConfigUtil.getPrivateKeyPath(bundleContext)); + checkState(!Strings.isNullOrEmpty(path), "Path to ssh private key is blank. Reconfigure %s", + NetconfConfigUtil.getPrivateKeyKey()); - private static Thread runNetconfSshThread(final NetconfSSHServer server) { - final Thread serverThread = new Thread(server, "netconf SSH server thread"); - serverThread.setDaemon(true); - serverThread.start(); - logger.trace("Netconf SSH bridge up and running."); - return serverThread; + final SshProxyServer sshProxyServer = new SshProxyServer(minaTimerExecutor, clientGroup, nioExecutor); + sshProxyServer.bind(sshSocketAddress, localAddress, authProviderTracker, new PEMGeneratorHostKeyProvider(path, ALGORITHM, KEY_SIZE)); + return sshProxyServer; } - private static class AuthProviderTracker implements ServiceTrackerCustomizer { - private final BundleContext bundleContext; - private final NetconfSSHServer server; - - private Integer maxPreference; - private Thread sshThread; - private final ServiceTracker listenerTracker; - - public AuthProviderTracker(final BundleContext bundleContext, final NetconfSSHServer server) { - this.bundleContext = bundleContext; - this.server = server; - listenerTracker = new ServiceTracker<>(bundleContext, AuthProvider.class, this); - listenerTracker.open(); - } - - @Override - public AuthProvider addingService(final ServiceReference reference) { - logger.trace("Service {} added", reference); - final AuthProvider authService = bundleContext.getService(reference); - final Integer newServicePreference = getPreference(reference); - if(isBetter(newServicePreference)) { - maxPreference = newServicePreference; - server.setAuthProvider(authService); - if(sshThread == null) { - sshThread = runNetconfSshThread(server); - } - } - return authService; - } - - private Integer getPreference(final ServiceReference reference) { - final Object preferenceProperty = reference.getProperty(AuthConstants.SERVICE_PREFERENCE_KEY); - return preferenceProperty == null ? Integer.MIN_VALUE : Integer.valueOf(preferenceProperty.toString()); - } - - private boolean isBetter(final Integer newServicePreference) { - Preconditions.checkNotNull(newServicePreference); - if(maxPreference == null) { - return true; - } - - return newServicePreference > maxPreference; - } - - @Override - public void modifiedService(final ServiceReference reference, final AuthProvider service) { - final AuthProvider authService = bundleContext.getService(reference); - final Integer newServicePreference = getPreference(reference); - if(isBetter(newServicePreference)) { - logger.trace("Replacing modified service {} in netconf SSH.", reference); - server.setAuthProvider(authService); - } - } - - @Override - public void removedService(final ServiceReference reference, final AuthProvider service) { - logger.trace("Removing service {} from netconf SSH. " + - "SSH won't authenticate users until AuthProvider service will be started.", reference); - maxPreference = null; - server.setAuthProvider(null); - } - - public void stop() { - listenerTracker.close(); - // sshThread should finish normally since sshServer.close stops processing - } - - } }