2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.netconf.ssh.threads;
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
13 import ch.ethz.ssh2.AuthenticationResult;
14 import ch.ethz.ssh2.PtySettings;
15 import ch.ethz.ssh2.ServerAuthenticationCallback;
16 import ch.ethz.ssh2.ServerConnection;
17 import ch.ethz.ssh2.ServerConnectionCallback;
18 import ch.ethz.ssh2.ServerSession;
19 import ch.ethz.ssh2.ServerSessionCallback;
20 import ch.ethz.ssh2.SimpleServerSessionCallback;
21 import com.google.common.base.Supplier;
22 import io.netty.bootstrap.Bootstrap;
23 import io.netty.buffer.ByteBuf;
24 import io.netty.buffer.ByteBufProcessor;
25 import io.netty.buffer.Unpooled;
26 import io.netty.channel.Channel;
27 import io.netty.channel.ChannelFuture;
28 import io.netty.channel.ChannelHandlerContext;
29 import io.netty.channel.ChannelInboundHandlerAdapter;
30 import io.netty.channel.ChannelInitializer;
31 import io.netty.channel.EventLoopGroup;
32 import io.netty.channel.local.LocalAddress;
33 import io.netty.channel.local.LocalChannel;
34 import io.netty.handler.stream.ChunkedStream;
35 import java.io.BufferedOutputStream;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.io.OutputStream;
39 import java.net.Socket;
40 import javax.annotation.concurrent.NotThreadSafe;
41 import javax.annotation.concurrent.ThreadSafe;
42 import org.opendaylight.controller.netconf.ssh.authentication.AuthProvider;
43 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
48 * One instance represents per connection, responsible for ssh handshake.
49 * Once auth succeeds and correct subsystem is chosen, backend connection with
50 * netty netconf server is made. This task finishes right after negotiation is done.
53 public class Handshaker implements Runnable {
54 private static final Logger logger = LoggerFactory.getLogger(Handshaker.class);
56 private final ServerConnection ganymedConnection;
57 private final String session;
60 public Handshaker(Socket socket, LocalAddress localAddress, long sessionId, AuthProvider authProvider,
61 EventLoopGroup bossGroup) throws IOException {
63 this.session = "Session " + sessionId;
65 String remoteAddressWithPort = socket.getRemoteSocketAddress().toString().replace("/", "");
66 logger.debug("{} started with {}", session, remoteAddressWithPort);
67 String remoteAddress, remotePort;
68 if (remoteAddressWithPort.contains(":")) {
69 String[] split = remoteAddressWithPort.split(":");
70 remoteAddress = split[0];
71 remotePort = split[1];
73 remoteAddress = remoteAddressWithPort;
76 ServerAuthenticationCallbackImpl serverAuthenticationCallback = new ServerAuthenticationCallbackImpl(
77 authProvider, session);
79 ganymedConnection = new ServerConnection(socket);
81 ServerConnectionCallbackImpl serverConnectionCallback = new ServerConnectionCallbackImpl(
82 serverAuthenticationCallback, remoteAddress, remotePort, session,
83 getGanymedAutoCloseable(ganymedConnection), localAddress, bossGroup);
86 ganymedConnection.setPEMHostKey(authProvider.getPEMAsCharArray(), null);
87 ganymedConnection.setAuthenticationCallback(serverAuthenticationCallback);
88 ganymedConnection.setServerConnectionCallback(serverConnectionCallback);
92 private static AutoCloseable getGanymedAutoCloseable(final ServerConnection ganymedConnection) {
93 return new AutoCloseable() {
95 public void close() throws Exception {
96 ganymedConnection.close();
103 // let ganymed process handshake
104 logger.trace("{} is started", session);
106 // TODO this should be guarded with a timer to prevent resource exhaustion
107 ganymedConnection.connect();
108 } catch (IOException e) {
109 logger.debug("{} connection error", session, e);
111 logger.trace("{} is exiting", session);
116 * Netty client handler that forwards bytes from backed server to supplied output stream.
117 * When backend server closes the connection, remoteConnection.close() is called to tear
118 * down ssh connection.
120 class SSHClientHandler extends ChannelInboundHandlerAdapter {
121 private static final Logger logger = LoggerFactory.getLogger(SSHClientHandler.class);
122 private final AutoCloseable remoteConnection;
123 private final BufferedOutputStream remoteOutputStream;
124 private final String session;
125 private ChannelHandlerContext channelHandlerContext;
127 public SSHClientHandler(AutoCloseable remoteConnection, OutputStream remoteOutputStream,
129 this.remoteConnection = remoteConnection;
130 this.remoteOutputStream = new BufferedOutputStream(remoteOutputStream);
131 this.session = session;
135 public void channelActive(ChannelHandlerContext ctx) {
136 this.channelHandlerContext = ctx;
137 logger.debug("{} Client active", session);
141 public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
142 ByteBuf bb = (ByteBuf) msg;
143 // we can block the server here so that slow client does not cause memory pressure
145 bb.forEachByte(new ByteBufProcessor() {
147 public boolean process(byte value) throws Exception {
148 remoteOutputStream.write(value);
158 public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
159 logger.trace("{} Flushing", session);
160 remoteOutputStream.flush();
164 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
165 // Close the connection when an exception is raised.
166 logger.warn("{} Unexpected exception from downstream", session, cause);
171 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
172 logger.trace("{} channelInactive() called, closing remote client ctx", session);
173 remoteConnection.close();//this should close socket and all threads created for this client
174 this.channelHandlerContext = null;
177 public ChannelHandlerContext getChannelHandlerContext() {
178 return checkNotNull(channelHandlerContext, "Channel is not active");
183 * Ganymed handler that gets unencrypted input and output streams, connects them to netty.
184 * Checks that 'netconf' subsystem is chosen by user.
185 * Launches new ClientInputStreamPoolingThread thread once session is established.
186 * Writes custom header to netty server, to inform it about IP address and username.
188 class ServerConnectionCallbackImpl implements ServerConnectionCallback {
189 private static final Logger logger = LoggerFactory.getLogger(ServerConnectionCallbackImpl.class);
190 public static final String NETCONF_SUBSYSTEM = "netconf";
192 private final Supplier<String> currentUserSupplier;
193 private final String remoteAddress;
194 private final String remotePort;
195 private final String session;
196 private final AutoCloseable ganymedConnection;
197 private final LocalAddress localAddress;
198 private final EventLoopGroup bossGroup;
200 ServerConnectionCallbackImpl(Supplier<String> currentUserSupplier, String remoteAddress, String remotePort, String session,
201 AutoCloseable ganymedConnection, LocalAddress localAddress, EventLoopGroup bossGroup) {
202 this.currentUserSupplier = currentUserSupplier;
203 this.remoteAddress = remoteAddress;
204 this.remotePort = remotePort;
205 this.session = session;
206 this.ganymedConnection = ganymedConnection;
207 // initialize netty local connection
208 this.localAddress = localAddress;
209 this.bossGroup = bossGroup;
212 private static ChannelFuture initializeNettyConnection(LocalAddress localAddress, EventLoopGroup bossGroup,
213 final SSHClientHandler sshClientHandler) {
214 Bootstrap clientBootstrap = new Bootstrap();
215 clientBootstrap.group(bossGroup).channel(LocalChannel.class);
217 clientBootstrap.handler(new ChannelInitializer<LocalChannel>() {
219 public void initChannel(LocalChannel ch) throws Exception {
220 ch.pipeline().addLast(sshClientHandler);
223 // asynchronously initialize local connection to netconf server
224 return clientBootstrap.connect(localAddress);
228 public ServerSessionCallback acceptSession(final ServerSession serverSession) {
229 String currentUser = currentUserSupplier.get();
230 final String additionalHeader = new NetconfHelloMessageAdditionalHeader(currentUser, remoteAddress,
231 remotePort, "ssh", "client").toFormattedString();
234 return new SimpleServerSessionCallback() {
236 public Runnable requestSubsystem(final ServerSession ss, final String subsystem) throws IOException {
237 return new Runnable() {
240 if (NETCONF_SUBSYSTEM.equals(subsystem)) {
242 final SSHClientHandler sshClientHandler = new SSHClientHandler(ganymedConnection, ss.getStdin(), session);
243 ChannelFuture clientChannelFuture = initializeNettyConnection(localAddress, bossGroup, sshClientHandler);
245 final Channel channel = clientChannelFuture.awaitUninterruptibly().channel();
247 // write additional header before polling thread is started
248 // polling thread could process and forward data before additional header is written
249 // This will result into unexpected state: hello message without additional header and the next message with additional header
250 channel.writeAndFlush(Unpooled.copiedBuffer(additionalHeader.getBytes()));
252 new ClientInputStreamPoolingThread(session, ss.getStdout(), channel, new AutoCloseable() {
254 public void close() throws Exception {
255 logger.trace("Closing both ganymed and local connection");
257 ganymedConnection.close();
258 } catch (Exception e) {
259 logger.warn("Ignoring exception while closing ganymed", e);
263 } catch (Exception e) {
264 logger.warn("Ignoring exception while closing channel", e);
267 }, sshClientHandler.getChannelHandlerContext()).start();
269 logger.debug("{} Wrong subsystem requested:'{}', closing ssh session", serverSession, subsystem);
270 String reason = "Only netconf subsystem is supported, requested:" + subsystem;
271 closeSession(ss, reason);
277 public void closeSession(ServerSession ss, String reason) {
278 logger.trace("{} Closing session - {}", serverSession, reason);
280 ss.getStdin().write(reason.getBytes());
281 } catch (IOException e) {
282 logger.warn("{} Exception while closing session", serverSession, e);
288 public Runnable requestPtyReq(final ServerSession ss, final PtySettings pty) throws IOException {
289 return new Runnable() {
292 closeSession(ss, "PTY request not supported");
298 public Runnable requestShell(final ServerSession ss) throws IOException {
299 return new Runnable() {
302 closeSession(ss, "Shell not supported");
311 * Only thread that is required during ssh session, forwards client's input to netty.
312 * When user closes connection, onEndOfInput.close() is called to tear down the local channel.
314 class ClientInputStreamPoolingThread extends Thread {
315 private static final Logger logger = LoggerFactory.getLogger(ClientInputStreamPoolingThread.class);
317 private final InputStream fromClientIS;
318 private final Channel serverChannel;
319 private final AutoCloseable onEndOfInput;
320 private final ChannelHandlerContext channelHandlerContext;
322 ClientInputStreamPoolingThread(String session, InputStream fromClientIS, Channel serverChannel, AutoCloseable onEndOfInput,
323 ChannelHandlerContext channelHandlerContext) {
324 super(ClientInputStreamPoolingThread.class.getSimpleName() + " " + session);
325 this.fromClientIS = fromClientIS;
326 this.serverChannel = serverChannel;
327 this.onEndOfInput = onEndOfInput;
328 this.channelHandlerContext = channelHandlerContext;
333 ChunkedStream chunkedStream = new ChunkedStream(fromClientIS);
336 while ((byteBuf = chunkedStream.readChunk(channelHandlerContext/*only needed for ByteBuf alloc */)) != null) {
337 serverChannel.writeAndFlush(byteBuf);
339 } catch (Exception e) {
340 logger.warn("Exception", e);
342 logger.trace("End of input");
343 // tear down connection
345 onEndOfInput.close();
346 } catch (Exception e) {
347 logger.warn("Ignoring exception while closing socket", e);
354 * Authentication handler for ganymed.
355 * Provides current user name after authenticating using supplied AuthProvider.
358 class ServerAuthenticationCallbackImpl implements ServerAuthenticationCallback, Supplier<String> {
359 private static final Logger logger = LoggerFactory.getLogger(ServerAuthenticationCallbackImpl.class);
360 private final AuthProvider authProvider;
361 private final String session;
362 private String currentUser;
364 ServerAuthenticationCallbackImpl(AuthProvider authProvider, String session) {
365 this.authProvider = authProvider;
366 this.session = session;
370 public String initAuthentication(ServerConnection sc) {
371 logger.trace("{} Established connection", session);
372 return "Established connection" + "\r\n";
376 public String[] getRemainingAuthMethods(ServerConnection sc) {
377 return new String[]{ServerAuthenticationCallback.METHOD_PASSWORD};
381 public AuthenticationResult authenticateWithNone(ServerConnection sc, String username) {
382 return AuthenticationResult.FAILURE;
386 public AuthenticationResult authenticateWithPassword(ServerConnection sc, String username, String password) {
387 checkState(currentUser == null);
389 if (authProvider.authenticated(username, password)) {
390 currentUser = username;
391 logger.trace("{} user {} authenticated", session, currentUser);
392 return AuthenticationResult.SUCCESS;
394 } catch (Exception e) {
395 logger.warn("{} Authentication failed", session, e);
397 return AuthenticationResult.FAILURE;
401 public AuthenticationResult authenticateWithPublicKey(ServerConnection sc, String username, String algorithm,
402 byte[] publicKey, byte[] signature) {
403 return AuthenticationResult.FAILURE;
407 public String get() {