X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fnetconf%2Fnetconf-ssh%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fssh%2Fthreads%2FHandshaker.java;fp=opendaylight%2Fnetconf%2Fnetconf-ssh%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fssh%2Fthreads%2FHandshaker.java;h=0000000000000000000000000000000000000000;hb=74e4bbe4c6f47980dfad1028da6cc5d0bd7ef326;hp=eec6c3a0971d8a3369ee05a863540bed4b85f8b2;hpb=720b039093db2268d6dfeed364e685d34d6c62bf;p=controller.git diff --git a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/threads/Handshaker.java b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/threads/Handshaker.java deleted file mode 100644 index eec6c3a097..0000000000 --- a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/threads/Handshaker.java +++ /dev/null @@ -1,415 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.netconf.ssh.threads; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; - -import javax.annotation.concurrent.NotThreadSafe; -import javax.annotation.concurrent.ThreadSafe; - -import org.opendaylight.controller.netconf.auth.AuthProvider; -import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import ch.ethz.ssh2.AuthenticationResult; -import ch.ethz.ssh2.PtySettings; -import ch.ethz.ssh2.ServerAuthenticationCallback; -import ch.ethz.ssh2.ServerConnection; -import ch.ethz.ssh2.ServerConnectionCallback; -import ch.ethz.ssh2.ServerSession; -import ch.ethz.ssh2.ServerSessionCallback; -import ch.ethz.ssh2.SimpleServerSessionCallback; - -import com.google.common.base.Supplier; - -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufProcessor; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.local.LocalAddress; -import io.netty.channel.local.LocalChannel; -import io.netty.handler.stream.ChunkedStream; - -/** - * One instance represents per connection, responsible for ssh handshake. - * Once auth succeeds and correct subsystem is chosen, backend connection with - * netty netconf server is made. This task finishes right after negotiation is done. - */ -@ThreadSafe -public class Handshaker implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(Handshaker.class); - - private final ServerConnection ganymedConnection; - private final String session; - - - public Handshaker(Socket socket, LocalAddress localAddress, long sessionId, AuthProvider authProvider, - EventLoopGroup bossGroup, final char[] pem) throws IOException { - - this.session = "Session " + sessionId; - - String remoteAddressWithPort = socket.getRemoteSocketAddress().toString().replace("/", ""); - logger.debug("{} started with {}", session, remoteAddressWithPort); - String remoteAddress, remotePort; - if (remoteAddressWithPort.contains(":")) { - String[] split = remoteAddressWithPort.split(":"); - remoteAddress = split[0]; - remotePort = split[1]; - } else { - remoteAddress = remoteAddressWithPort; - remotePort = ""; - } - ServerAuthenticationCallbackImpl serverAuthenticationCallback = new ServerAuthenticationCallbackImpl( - authProvider, session); - - ganymedConnection = new ServerConnection(socket); - - ServerConnectionCallbackImpl serverConnectionCallback = new ServerConnectionCallbackImpl( - serverAuthenticationCallback, remoteAddress, remotePort, session, - getGanymedAutoCloseable(ganymedConnection), localAddress, bossGroup); - - // initialize ganymed - ganymedConnection.setPEMHostKey(pem, null); - ganymedConnection.setAuthenticationCallback(serverAuthenticationCallback); - ganymedConnection.setServerConnectionCallback(serverConnectionCallback); - } - - - private static AutoCloseable getGanymedAutoCloseable(final ServerConnection ganymedConnection) { - return new AutoCloseable() { - @Override - public void close() throws Exception { - ganymedConnection.close(); - } - }; - } - - @Override - public void run() { - // let ganymed process handshake - logger.trace("{} is started", session); - try { - // TODO this should be guarded with a timer to prevent resource exhaustion - ganymedConnection.connect(); - } catch (IOException e) { - logger.debug("{} connection error", session, e); - } - logger.trace("{} is exiting", session); - } -} - -/** - * Netty client handler that forwards bytes from backed server to supplied output stream. - * When backend server closes the connection, remoteConnection.close() is called to tear - * down ssh connection. - */ -class SSHClientHandler extends ChannelInboundHandlerAdapter { - private static final Logger logger = LoggerFactory.getLogger(SSHClientHandler.class); - private final AutoCloseable remoteConnection; - private final BufferedOutputStream remoteOutputStream; - private final String session; - private ChannelHandlerContext channelHandlerContext; - - public SSHClientHandler(AutoCloseable remoteConnection, OutputStream remoteOutputStream, - String session) { - this.remoteConnection = remoteConnection; - this.remoteOutputStream = new BufferedOutputStream(remoteOutputStream); - this.session = session; - } - - @Override - public void channelActive(ChannelHandlerContext ctx) { - this.channelHandlerContext = ctx; - logger.debug("{} Client active", session); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { - ByteBuf bb = (ByteBuf) msg; - // we can block the server here so that slow client does not cause memory pressure - try { - bb.forEachByte(new ByteBufProcessor() { - @Override - public boolean process(byte value) throws Exception { - remoteOutputStream.write(value); - return true; - } - }); - } finally { - bb.release(); - } - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws IOException { - logger.trace("{} Flushing", session); - remoteOutputStream.flush(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - // Close the connection when an exception is raised. - logger.warn("{} Unexpected exception from downstream", session, cause); - ctx.close(); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - logger.trace("{} channelInactive() called, closing remote client ctx", session); - remoteConnection.close();//this should close socket and all threads created for this client - this.channelHandlerContext = null; - } - - public ChannelHandlerContext getChannelHandlerContext() { - return checkNotNull(channelHandlerContext, "Channel is not active"); - } -} - -/** - * Ganymed handler that gets unencrypted input and output streams, connects them to netty. - * Checks that 'netconf' subsystem is chosen by user. - * Launches new ClientInputStreamPoolingThread thread once session is established. - * Writes custom header to netty server, to inform it about IP address and username. - */ -class ServerConnectionCallbackImpl implements ServerConnectionCallback { - private static final Logger logger = LoggerFactory.getLogger(ServerConnectionCallbackImpl.class); - public static final String NETCONF_SUBSYSTEM = "netconf"; - - private final Supplier currentUserSupplier; - private final String remoteAddress; - private final String remotePort; - private final String session; - private final AutoCloseable ganymedConnection; - private final LocalAddress localAddress; - private final EventLoopGroup bossGroup; - - ServerConnectionCallbackImpl(Supplier currentUserSupplier, String remoteAddress, String remotePort, String session, - AutoCloseable ganymedConnection, LocalAddress localAddress, EventLoopGroup bossGroup) { - this.currentUserSupplier = currentUserSupplier; - this.remoteAddress = remoteAddress; - this.remotePort = remotePort; - this.session = session; - this.ganymedConnection = ganymedConnection; - // initialize netty local connection - this.localAddress = localAddress; - this.bossGroup = bossGroup; - } - - private static ChannelFuture initializeNettyConnection(LocalAddress localAddress, EventLoopGroup bossGroup, - final SSHClientHandler sshClientHandler) { - Bootstrap clientBootstrap = new Bootstrap(); - clientBootstrap.group(bossGroup).channel(LocalChannel.class); - - clientBootstrap.handler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(sshClientHandler); - } - }); - // asynchronously initialize local connection to netconf server - return clientBootstrap.connect(localAddress); - } - - @Override - public ServerSessionCallback acceptSession(final ServerSession serverSession) { - String currentUser = currentUserSupplier.get(); - final String additionalHeader = new NetconfHelloMessageAdditionalHeader(currentUser, remoteAddress, - remotePort, "ssh", "client").toFormattedString(); - - - return new SimpleServerSessionCallback() { - @Override - public Runnable requestSubsystem(final ServerSession ss, final String subsystem) throws IOException { - return new Runnable() { - @Override - public void run() { - if (NETCONF_SUBSYSTEM.equals(subsystem)) { - // connect - final SSHClientHandler sshClientHandler = new SSHClientHandler(ganymedConnection, ss.getStdin(), session); - ChannelFuture clientChannelFuture = initializeNettyConnection(localAddress, bossGroup, sshClientHandler); - // get channel - final Channel channel = clientChannelFuture.awaitUninterruptibly().channel(); - - // write additional header before polling thread is started - // polling thread could process and forward data before additional header is written - // This will result into unexpected state: hello message without additional header and the next message with additional header - channel.writeAndFlush(Unpooled.copiedBuffer(additionalHeader.getBytes())); - - new ClientInputStreamPoolingThread(session, ss.getStdout(), channel, new AutoCloseable() { - @Override - public void close() throws Exception { - logger.trace("Closing both ganymed and local connection"); - try { - ganymedConnection.close(); - } catch (Exception e) { - logger.warn("Ignoring exception while closing ganymed", e); - } - try { - channel.close(); - } catch (Exception e) { - logger.warn("Ignoring exception while closing channel", e); - } - } - }, sshClientHandler.getChannelHandlerContext()).start(); - } else { - logger.debug("{} Wrong subsystem requested:'{}', closing ssh session", serverSession, subsystem); - String reason = "Only netconf subsystem is supported, requested:" + subsystem; - closeSession(ss, reason); - } - } - }; - } - - public void closeSession(ServerSession ss, String reason) { - logger.trace("{} Closing session - {}", serverSession, reason); - try { - ss.getStdin().write(reason.getBytes()); - } catch (IOException e) { - logger.warn("{} Exception while closing session", serverSession, e); - } - ss.close(); - } - - @Override - public Runnable requestPtyReq(final ServerSession ss, final PtySettings pty) throws IOException { - return new Runnable() { - @Override - public void run() { - closeSession(ss, "PTY request not supported"); - } - }; - } - - @Override - public Runnable requestShell(final ServerSession ss) throws IOException { - return new Runnable() { - @Override - public void run() { - closeSession(ss, "Shell not supported"); - } - }; - } - }; - } -} - -/** - * Only thread that is required during ssh session, forwards client's input to netty. - * When user closes connection, onEndOfInput.close() is called to tear down the local channel. - */ -class ClientInputStreamPoolingThread extends Thread { - private static final Logger logger = LoggerFactory.getLogger(ClientInputStreamPoolingThread.class); - - private final InputStream fromClientIS; - private final Channel serverChannel; - private final AutoCloseable onEndOfInput; - private final ChannelHandlerContext channelHandlerContext; - - ClientInputStreamPoolingThread(String session, InputStream fromClientIS, Channel serverChannel, AutoCloseable onEndOfInput, - ChannelHandlerContext channelHandlerContext) { - super(ClientInputStreamPoolingThread.class.getSimpleName() + " " + session); - this.fromClientIS = fromClientIS; - this.serverChannel = serverChannel; - this.onEndOfInput = onEndOfInput; - this.channelHandlerContext = channelHandlerContext; - } - - @Override - public void run() { - ChunkedStream chunkedStream = new ChunkedStream(fromClientIS); - try { - ByteBuf byteBuf; - while ((byteBuf = chunkedStream.readChunk(channelHandlerContext/*only needed for ByteBuf alloc */)) != null) { - serverChannel.writeAndFlush(byteBuf); - } - } catch (Exception e) { - logger.warn("Exception", e); - } finally { - logger.trace("End of input"); - // tear down connection - try { - onEndOfInput.close(); - } catch (Exception e) { - logger.warn("Ignoring exception while closing socket", e); - } - } - } -} - -/** - * Authentication handler for ganymed. - * Provides current user name after authenticating using supplied AuthProvider. - */ -@NotThreadSafe -class ServerAuthenticationCallbackImpl implements ServerAuthenticationCallback, Supplier { - private static final Logger logger = LoggerFactory.getLogger(ServerAuthenticationCallbackImpl.class); - private final AuthProvider authProvider; - private final String session; - private String currentUser; - - ServerAuthenticationCallbackImpl(AuthProvider authProvider, String session) { - this.authProvider = authProvider; - this.session = session; - } - - @Override - public String initAuthentication(ServerConnection sc) { - logger.trace("{} Established connection", session); - return "Established connection" + "\r\n"; - } - - @Override - public String[] getRemainingAuthMethods(ServerConnection sc) { - return new String[]{ServerAuthenticationCallback.METHOD_PASSWORD}; - } - - @Override - public AuthenticationResult authenticateWithNone(ServerConnection sc, String username) { - return AuthenticationResult.FAILURE; - } - - @Override - public AuthenticationResult authenticateWithPassword(ServerConnection sc, String username, String password) { - checkState(currentUser == null); - try { - if (authProvider.authenticated(username, password)) { - currentUser = username; - logger.trace("{} user {} authenticated", session, currentUser); - return AuthenticationResult.SUCCESS; - } - } catch (Exception e) { - logger.warn("{} Authentication failed", session, e); - } - return AuthenticationResult.FAILURE; - } - - @Override - public AuthenticationResult authenticateWithPublicKey(ServerConnection sc, String username, String algorithm, - byte[] publicKey, byte[] signature) { - return AuthenticationResult.FAILURE; - } - - @Override - public String get() { - return currentUser; - } -}