2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.netconf.ssh.threads;
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
13 import java.io.BufferedOutputStream;
14 import java.io.IOException;
15 import java.io.InputStream;
16 import java.io.OutputStream;
17 import java.net.Socket;
19 import javax.annotation.concurrent.NotThreadSafe;
20 import javax.annotation.concurrent.ThreadSafe;
22 import org.opendaylight.controller.netconf.auth.AuthProvider;
23 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
27 import ch.ethz.ssh2.AuthenticationResult;
28 import ch.ethz.ssh2.PtySettings;
29 import ch.ethz.ssh2.ServerAuthenticationCallback;
30 import ch.ethz.ssh2.ServerConnection;
31 import ch.ethz.ssh2.ServerConnectionCallback;
32 import ch.ethz.ssh2.ServerSession;
33 import ch.ethz.ssh2.ServerSessionCallback;
34 import ch.ethz.ssh2.SimpleServerSessionCallback;
36 import com.google.common.base.Supplier;
38 import io.netty.bootstrap.Bootstrap;
39 import io.netty.buffer.ByteBuf;
40 import io.netty.buffer.ByteBufProcessor;
41 import io.netty.buffer.Unpooled;
42 import io.netty.channel.Channel;
43 import io.netty.channel.ChannelFuture;
44 import io.netty.channel.ChannelHandlerContext;
45 import io.netty.channel.ChannelInboundHandlerAdapter;
46 import io.netty.channel.ChannelInitializer;
47 import io.netty.channel.EventLoopGroup;
48 import io.netty.channel.local.LocalAddress;
49 import io.netty.channel.local.LocalChannel;
50 import io.netty.handler.stream.ChunkedStream;
53 * One instance represents per connection, responsible for ssh handshake.
54 * Once auth succeeds and correct subsystem is chosen, backend connection with
55 * netty netconf server is made. This task finishes right after negotiation is done.
58 public class Handshaker implements Runnable {
59 private static final Logger logger = LoggerFactory.getLogger(Handshaker.class);
61 private final ServerConnection ganymedConnection;
62 private final String session;
65 public Handshaker(Socket socket, LocalAddress localAddress, long sessionId, AuthProvider authProvider,
66 EventLoopGroup bossGroup, final char[] pem) throws IOException {
68 this.session = "Session " + sessionId;
70 String remoteAddressWithPort = socket.getRemoteSocketAddress().toString().replace("/", "");
71 logger.debug("{} started with {}", session, remoteAddressWithPort);
72 String remoteAddress, remotePort;
73 if (remoteAddressWithPort.contains(":")) {
74 String[] split = remoteAddressWithPort.split(":");
75 remoteAddress = split[0];
76 remotePort = split[1];
78 remoteAddress = remoteAddressWithPort;
81 ServerAuthenticationCallbackImpl serverAuthenticationCallback = new ServerAuthenticationCallbackImpl(
82 authProvider, session);
84 ganymedConnection = new ServerConnection(socket);
86 ServerConnectionCallbackImpl serverConnectionCallback = new ServerConnectionCallbackImpl(
87 serverAuthenticationCallback, remoteAddress, remotePort, session,
88 getGanymedAutoCloseable(ganymedConnection), localAddress, bossGroup);
91 ganymedConnection.setPEMHostKey(pem, null);
92 ganymedConnection.setAuthenticationCallback(serverAuthenticationCallback);
93 ganymedConnection.setServerConnectionCallback(serverConnectionCallback);
97 private static AutoCloseable getGanymedAutoCloseable(final ServerConnection ganymedConnection) {
98 return new AutoCloseable() {
100 public void close() throws Exception {
101 ganymedConnection.close();
108 // let ganymed process handshake
109 logger.trace("{} is started", session);
111 // TODO this should be guarded with a timer to prevent resource exhaustion
112 ganymedConnection.connect();
113 } catch (IOException e) {
114 logger.debug("{} connection error", session, e);
116 logger.trace("{} is exiting", session);
121 * Netty client handler that forwards bytes from backed server to supplied output stream.
122 * When backend server closes the connection, remoteConnection.close() is called to tear
123 * down ssh connection.
125 class SSHClientHandler extends ChannelInboundHandlerAdapter {
126 private static final Logger logger = LoggerFactory.getLogger(SSHClientHandler.class);
127 private final AutoCloseable remoteConnection;
128 private final BufferedOutputStream remoteOutputStream;
129 private final String session;
130 private ChannelHandlerContext channelHandlerContext;
132 public SSHClientHandler(AutoCloseable remoteConnection, OutputStream remoteOutputStream,
134 this.remoteConnection = remoteConnection;
135 this.remoteOutputStream = new BufferedOutputStream(remoteOutputStream);
136 this.session = session;
140 public void channelActive(ChannelHandlerContext ctx) {
141 this.channelHandlerContext = ctx;
142 logger.debug("{} Client active", session);
146 public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
147 ByteBuf bb = (ByteBuf) msg;
148 // we can block the server here so that slow client does not cause memory pressure
150 bb.forEachByte(new ByteBufProcessor() {
152 public boolean process(byte value) throws Exception {
153 remoteOutputStream.write(value);
163 public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
164 logger.trace("{} Flushing", session);
165 remoteOutputStream.flush();
169 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
170 // Close the connection when an exception is raised.
171 logger.warn("{} Unexpected exception from downstream", session, cause);
176 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
177 logger.trace("{} channelInactive() called, closing remote client ctx", session);
178 remoteConnection.close();//this should close socket and all threads created for this client
179 this.channelHandlerContext = null;
182 public ChannelHandlerContext getChannelHandlerContext() {
183 return checkNotNull(channelHandlerContext, "Channel is not active");
188 * Ganymed handler that gets unencrypted input and output streams, connects them to netty.
189 * Checks that 'netconf' subsystem is chosen by user.
190 * Launches new ClientInputStreamPoolingThread thread once session is established.
191 * Writes custom header to netty server, to inform it about IP address and username.
193 class ServerConnectionCallbackImpl implements ServerConnectionCallback {
194 private static final Logger logger = LoggerFactory.getLogger(ServerConnectionCallbackImpl.class);
195 public static final String NETCONF_SUBSYSTEM = "netconf";
197 private final Supplier<String> currentUserSupplier;
198 private final String remoteAddress;
199 private final String remotePort;
200 private final String session;
201 private final AutoCloseable ganymedConnection;
202 private final LocalAddress localAddress;
203 private final EventLoopGroup bossGroup;
205 ServerConnectionCallbackImpl(Supplier<String> currentUserSupplier, String remoteAddress, String remotePort, String session,
206 AutoCloseable ganymedConnection, LocalAddress localAddress, EventLoopGroup bossGroup) {
207 this.currentUserSupplier = currentUserSupplier;
208 this.remoteAddress = remoteAddress;
209 this.remotePort = remotePort;
210 this.session = session;
211 this.ganymedConnection = ganymedConnection;
212 // initialize netty local connection
213 this.localAddress = localAddress;
214 this.bossGroup = bossGroup;
217 private static ChannelFuture initializeNettyConnection(LocalAddress localAddress, EventLoopGroup bossGroup,
218 final SSHClientHandler sshClientHandler) {
219 Bootstrap clientBootstrap = new Bootstrap();
220 clientBootstrap.group(bossGroup).channel(LocalChannel.class);
222 clientBootstrap.handler(new ChannelInitializer<LocalChannel>() {
224 public void initChannel(LocalChannel ch) throws Exception {
225 ch.pipeline().addLast(sshClientHandler);
228 // asynchronously initialize local connection to netconf server
229 return clientBootstrap.connect(localAddress);
233 public ServerSessionCallback acceptSession(final ServerSession serverSession) {
234 String currentUser = currentUserSupplier.get();
235 final String additionalHeader = new NetconfHelloMessageAdditionalHeader(currentUser, remoteAddress,
236 remotePort, "ssh", "client").toFormattedString();
239 return new SimpleServerSessionCallback() {
241 public Runnable requestSubsystem(final ServerSession ss, final String subsystem) throws IOException {
242 return new Runnable() {
245 if (NETCONF_SUBSYSTEM.equals(subsystem)) {
247 final SSHClientHandler sshClientHandler = new SSHClientHandler(ganymedConnection, ss.getStdin(), session);
248 ChannelFuture clientChannelFuture = initializeNettyConnection(localAddress, bossGroup, sshClientHandler);
250 final Channel channel = clientChannelFuture.awaitUninterruptibly().channel();
252 // write additional header before polling thread is started
253 // polling thread could process and forward data before additional header is written
254 // This will result into unexpected state: hello message without additional header and the next message with additional header
255 channel.writeAndFlush(Unpooled.copiedBuffer(additionalHeader.getBytes()));
257 new ClientInputStreamPoolingThread(session, ss.getStdout(), channel, new AutoCloseable() {
259 public void close() throws Exception {
260 logger.trace("Closing both ganymed and local connection");
262 ganymedConnection.close();
263 } catch (Exception e) {
264 logger.warn("Ignoring exception while closing ganymed", e);
268 } catch (Exception e) {
269 logger.warn("Ignoring exception while closing channel", e);
272 }, sshClientHandler.getChannelHandlerContext()).start();
274 logger.debug("{} Wrong subsystem requested:'{}', closing ssh session", serverSession, subsystem);
275 String reason = "Only netconf subsystem is supported, requested:" + subsystem;
276 closeSession(ss, reason);
282 public void closeSession(ServerSession ss, String reason) {
283 logger.trace("{} Closing session - {}", serverSession, reason);
285 ss.getStdin().write(reason.getBytes());
286 } catch (IOException e) {
287 logger.warn("{} Exception while closing session", serverSession, e);
293 public Runnable requestPtyReq(final ServerSession ss, final PtySettings pty) throws IOException {
294 return new Runnable() {
297 closeSession(ss, "PTY request not supported");
303 public Runnable requestShell(final ServerSession ss) throws IOException {
304 return new Runnable() {
307 closeSession(ss, "Shell not supported");
316 * Only thread that is required during ssh session, forwards client's input to netty.
317 * When user closes connection, onEndOfInput.close() is called to tear down the local channel.
319 class ClientInputStreamPoolingThread extends Thread {
320 private static final Logger logger = LoggerFactory.getLogger(ClientInputStreamPoolingThread.class);
322 private final InputStream fromClientIS;
323 private final Channel serverChannel;
324 private final AutoCloseable onEndOfInput;
325 private final ChannelHandlerContext channelHandlerContext;
327 ClientInputStreamPoolingThread(String session, InputStream fromClientIS, Channel serverChannel, AutoCloseable onEndOfInput,
328 ChannelHandlerContext channelHandlerContext) {
329 super(ClientInputStreamPoolingThread.class.getSimpleName() + " " + session);
330 this.fromClientIS = fromClientIS;
331 this.serverChannel = serverChannel;
332 this.onEndOfInput = onEndOfInput;
333 this.channelHandlerContext = channelHandlerContext;
338 ChunkedStream chunkedStream = new ChunkedStream(fromClientIS);
341 while ((byteBuf = chunkedStream.readChunk(channelHandlerContext/*only needed for ByteBuf alloc */)) != null) {
342 serverChannel.writeAndFlush(byteBuf);
344 } catch (Exception e) {
345 logger.warn("Exception", e);
347 logger.trace("End of input");
348 // tear down connection
350 onEndOfInput.close();
351 } catch (Exception e) {
352 logger.warn("Ignoring exception while closing socket", e);
359 * Authentication handler for ganymed.
360 * Provides current user name after authenticating using supplied AuthProvider.
363 class ServerAuthenticationCallbackImpl implements ServerAuthenticationCallback, Supplier<String> {
364 private static final Logger logger = LoggerFactory.getLogger(ServerAuthenticationCallbackImpl.class);
365 private final AuthProvider authProvider;
366 private final String session;
367 private String currentUser;
369 ServerAuthenticationCallbackImpl(AuthProvider authProvider, String session) {
370 this.authProvider = authProvider;
371 this.session = session;
375 public String initAuthentication(ServerConnection sc) {
376 logger.trace("{} Established connection", session);
377 return "Established connection" + "\r\n";
381 public String[] getRemainingAuthMethods(ServerConnection sc) {
382 return new String[]{ServerAuthenticationCallback.METHOD_PASSWORD};
386 public AuthenticationResult authenticateWithNone(ServerConnection sc, String username) {
387 return AuthenticationResult.FAILURE;
391 public AuthenticationResult authenticateWithPassword(ServerConnection sc, String username, String password) {
392 checkState(currentUser == null);
394 if (authProvider.authenticated(username, password)) {
395 currentUser = username;
396 logger.trace("{} user {} authenticated", session, currentUser);
397 return AuthenticationResult.SUCCESS;
399 } catch (Exception e) {
400 logger.warn("{} Authentication failed", session, e);
402 return AuthenticationResult.FAILURE;
406 public AuthenticationResult authenticateWithPublicKey(ServerConnection sc, String username, String algorithm,
407 byte[] publicKey, byte[] signature) {
408 return AuthenticationResult.FAILURE;
412 public String get() {