Merge "Bug 1636: Config Netconf Connector did not serialize service type"
[controller.git] / opendaylight / netconf / netconf-ssh / src / main / java / org / opendaylight / controller / netconf / ssh / threads / Handshaker.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.netconf.ssh.threads;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.io.BufferedOutputStream;
14 import java.io.IOException;
15 import java.io.InputStream;
16 import java.io.OutputStream;
17 import java.net.Socket;
18
19 import javax.annotation.concurrent.NotThreadSafe;
20 import javax.annotation.concurrent.ThreadSafe;
21
22 import org.opendaylight.controller.netconf.auth.AuthProvider;
23 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 import ch.ethz.ssh2.AuthenticationResult;
28 import ch.ethz.ssh2.PtySettings;
29 import ch.ethz.ssh2.ServerAuthenticationCallback;
30 import ch.ethz.ssh2.ServerConnection;
31 import ch.ethz.ssh2.ServerConnectionCallback;
32 import ch.ethz.ssh2.ServerSession;
33 import ch.ethz.ssh2.ServerSessionCallback;
34 import ch.ethz.ssh2.SimpleServerSessionCallback;
35
36 import com.google.common.base.Supplier;
37
38 import io.netty.bootstrap.Bootstrap;
39 import io.netty.buffer.ByteBuf;
40 import io.netty.buffer.ByteBufProcessor;
41 import io.netty.buffer.Unpooled;
42 import io.netty.channel.Channel;
43 import io.netty.channel.ChannelFuture;
44 import io.netty.channel.ChannelHandlerContext;
45 import io.netty.channel.ChannelInboundHandlerAdapter;
46 import io.netty.channel.ChannelInitializer;
47 import io.netty.channel.EventLoopGroup;
48 import io.netty.channel.local.LocalAddress;
49 import io.netty.channel.local.LocalChannel;
50 import io.netty.handler.stream.ChunkedStream;
51
52 /**
53  * One instance represents per connection, responsible for ssh handshake.
54  * Once auth succeeds and correct subsystem is chosen, backend connection with
55  * netty netconf server is made. This task finishes right after negotiation is done.
56  */
57 @ThreadSafe
58 public class Handshaker implements Runnable {
59     private static final Logger logger = LoggerFactory.getLogger(Handshaker.class);
60
61     private final ServerConnection ganymedConnection;
62     private final String session;
63
64
65     public Handshaker(Socket socket, LocalAddress localAddress, long sessionId, AuthProvider authProvider,
66                       EventLoopGroup bossGroup, final char[] pem) throws IOException {
67
68         this.session = "Session " + sessionId;
69
70         String remoteAddressWithPort = socket.getRemoteSocketAddress().toString().replace("/", "");
71         logger.debug("{} started with {}", session, remoteAddressWithPort);
72         String remoteAddress, remotePort;
73         if (remoteAddressWithPort.contains(":")) {
74             String[] split = remoteAddressWithPort.split(":");
75             remoteAddress = split[0];
76             remotePort = split[1];
77         } else {
78             remoteAddress = remoteAddressWithPort;
79             remotePort = "";
80         }
81         ServerAuthenticationCallbackImpl serverAuthenticationCallback = new ServerAuthenticationCallbackImpl(
82                 authProvider, session);
83
84         ganymedConnection = new ServerConnection(socket);
85
86         ServerConnectionCallbackImpl serverConnectionCallback = new ServerConnectionCallbackImpl(
87                 serverAuthenticationCallback, remoteAddress, remotePort, session,
88                 getGanymedAutoCloseable(ganymedConnection), localAddress, bossGroup);
89
90         // initialize ganymed
91         ganymedConnection.setPEMHostKey(pem, null);
92         ganymedConnection.setAuthenticationCallback(serverAuthenticationCallback);
93         ganymedConnection.setServerConnectionCallback(serverConnectionCallback);
94     }
95
96
97     private static AutoCloseable getGanymedAutoCloseable(final ServerConnection ganymedConnection) {
98         return new AutoCloseable() {
99             @Override
100             public void close() throws Exception {
101                 ganymedConnection.close();
102             }
103         };
104     }
105
106     @Override
107     public void run() {
108         // let ganymed process handshake
109         logger.trace("{} is started", session);
110         try {
111             // TODO this should be guarded with a timer to prevent resource exhaustion
112             ganymedConnection.connect();
113         } catch (IOException e) {
114             logger.debug("{} connection error", session, e);
115         }
116         logger.trace("{} is exiting", session);
117     }
118 }
119
120 /**
121  * Netty client handler that forwards bytes from backed server to supplied output stream.
122  * When backend server closes the connection, remoteConnection.close() is called to tear
123  * down ssh connection.
124  */
125 class SSHClientHandler extends ChannelInboundHandlerAdapter {
126     private static final Logger logger = LoggerFactory.getLogger(SSHClientHandler.class);
127     private final AutoCloseable remoteConnection;
128     private final BufferedOutputStream remoteOutputStream;
129     private final String session;
130     private ChannelHandlerContext channelHandlerContext;
131
132     public SSHClientHandler(AutoCloseable remoteConnection, OutputStream remoteOutputStream,
133                             String session) {
134         this.remoteConnection = remoteConnection;
135         this.remoteOutputStream = new BufferedOutputStream(remoteOutputStream);
136         this.session = session;
137     }
138
139     @Override
140     public void channelActive(ChannelHandlerContext ctx) {
141         this.channelHandlerContext = ctx;
142         logger.debug("{} Client active", session);
143     }
144
145     @Override
146     public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
147         ByteBuf bb = (ByteBuf) msg;
148         // we can block the server here so that slow client does not cause memory pressure
149         try {
150             bb.forEachByte(new ByteBufProcessor() {
151                 @Override
152                 public boolean process(byte value) throws Exception {
153                     remoteOutputStream.write(value);
154                     return true;
155                 }
156             });
157         } finally {
158             bb.release();
159         }
160     }
161
162     @Override
163     public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
164         logger.trace("{} Flushing", session);
165         remoteOutputStream.flush();
166     }
167
168     @Override
169     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
170         // Close the connection when an exception is raised.
171         logger.warn("{} Unexpected exception from downstream", session, cause);
172         ctx.close();
173     }
174
175     @Override
176     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
177         logger.trace("{} channelInactive() called, closing remote client ctx", session);
178         remoteConnection.close();//this should close socket and all threads created for this client
179         this.channelHandlerContext = null;
180     }
181
182     public ChannelHandlerContext getChannelHandlerContext() {
183         return checkNotNull(channelHandlerContext, "Channel is not active");
184     }
185 }
186
187 /**
188  * Ganymed handler that gets unencrypted input and output streams, connects them to netty.
189  * Checks that 'netconf' subsystem is chosen by user.
190  * Launches new ClientInputStreamPoolingThread thread once session is established.
191  * Writes custom header to netty server, to inform it about IP address and username.
192  */
193 class ServerConnectionCallbackImpl implements ServerConnectionCallback {
194     private static final Logger logger = LoggerFactory.getLogger(ServerConnectionCallbackImpl.class);
195     public static final String NETCONF_SUBSYSTEM = "netconf";
196
197     private final Supplier<String> currentUserSupplier;
198     private final String remoteAddress;
199     private final String remotePort;
200     private final String session;
201     private final AutoCloseable ganymedConnection;
202     private final LocalAddress localAddress;
203     private final EventLoopGroup bossGroup;
204
205     ServerConnectionCallbackImpl(Supplier<String> currentUserSupplier, String remoteAddress, String remotePort, String session,
206                                  AutoCloseable ganymedConnection, LocalAddress localAddress, EventLoopGroup bossGroup) {
207         this.currentUserSupplier = currentUserSupplier;
208         this.remoteAddress = remoteAddress;
209         this.remotePort = remotePort;
210         this.session = session;
211         this.ganymedConnection = ganymedConnection;
212         // initialize netty local connection
213         this.localAddress = localAddress;
214         this.bossGroup = bossGroup;
215     }
216
217     private static ChannelFuture initializeNettyConnection(LocalAddress localAddress, EventLoopGroup bossGroup,
218                                                            final SSHClientHandler sshClientHandler) {
219         Bootstrap clientBootstrap = new Bootstrap();
220         clientBootstrap.group(bossGroup).channel(LocalChannel.class);
221
222         clientBootstrap.handler(new ChannelInitializer<LocalChannel>() {
223             @Override
224             public void initChannel(LocalChannel ch) throws Exception {
225                 ch.pipeline().addLast(sshClientHandler);
226             }
227         });
228         // asynchronously initialize local connection to netconf server
229         return clientBootstrap.connect(localAddress);
230     }
231
232     @Override
233     public ServerSessionCallback acceptSession(final ServerSession serverSession) {
234         String currentUser = currentUserSupplier.get();
235         final String additionalHeader = new NetconfHelloMessageAdditionalHeader(currentUser, remoteAddress,
236                 remotePort, "ssh", "client").toFormattedString();
237
238
239         return new SimpleServerSessionCallback() {
240             @Override
241             public Runnable requestSubsystem(final ServerSession ss, final String subsystem) throws IOException {
242                 return new Runnable() {
243                     @Override
244                     public void run() {
245                         if (NETCONF_SUBSYSTEM.equals(subsystem)) {
246                             // connect
247                             final SSHClientHandler sshClientHandler = new SSHClientHandler(ganymedConnection, ss.getStdin(), session);
248                             ChannelFuture clientChannelFuture = initializeNettyConnection(localAddress, bossGroup, sshClientHandler);
249                             // get channel
250                             final Channel channel = clientChannelFuture.awaitUninterruptibly().channel();
251
252                             // write additional header before polling thread is started
253                             // polling thread could process and forward data before additional header is written
254                             // This will result into unexpected state:  hello message without additional header and the next message with additional header
255                             channel.writeAndFlush(Unpooled.copiedBuffer(additionalHeader.getBytes()));
256
257                             new ClientInputStreamPoolingThread(session, ss.getStdout(), channel, new AutoCloseable() {
258                                 @Override
259                                 public void close() throws Exception {
260                                     logger.trace("Closing both ganymed and local connection");
261                                     try {
262                                         ganymedConnection.close();
263                                     } catch (Exception e) {
264                                         logger.warn("Ignoring exception while closing ganymed", e);
265                                     }
266                                     try {
267                                         channel.close();
268                                     } catch (Exception e) {
269                                         logger.warn("Ignoring exception while closing channel", e);
270                                     }
271                                 }
272                             }, sshClientHandler.getChannelHandlerContext()).start();
273                         } else {
274                             logger.debug("{} Wrong subsystem requested:'{}', closing ssh session", serverSession, subsystem);
275                             String reason = "Only netconf subsystem is supported, requested:" + subsystem;
276                             closeSession(ss, reason);
277                         }
278                     }
279                 };
280             }
281
282             public void closeSession(ServerSession ss, String reason) {
283                 logger.trace("{} Closing session - {}", serverSession, reason);
284                 try {
285                     ss.getStdin().write(reason.getBytes());
286                 } catch (IOException e) {
287                     logger.warn("{} Exception while closing session", serverSession, e);
288                 }
289                 ss.close();
290             }
291
292             @Override
293             public Runnable requestPtyReq(final ServerSession ss, final PtySettings pty) throws IOException {
294                 return new Runnable() {
295                     @Override
296                     public void run() {
297                         closeSession(ss, "PTY request not supported");
298                     }
299                 };
300             }
301
302             @Override
303             public Runnable requestShell(final ServerSession ss) throws IOException {
304                 return new Runnable() {
305                     @Override
306                     public void run() {
307                         closeSession(ss, "Shell not supported");
308                     }
309                 };
310             }
311         };
312     }
313 }
314
315 /**
316  * Only thread that is required during ssh session, forwards client's input to netty.
317  * When user closes connection, onEndOfInput.close() is called to tear down the local channel.
318  */
319 class ClientInputStreamPoolingThread extends Thread {
320     private static final Logger logger = LoggerFactory.getLogger(ClientInputStreamPoolingThread.class);
321
322     private final InputStream fromClientIS;
323     private final Channel serverChannel;
324     private final AutoCloseable onEndOfInput;
325     private final ChannelHandlerContext channelHandlerContext;
326
327     ClientInputStreamPoolingThread(String session, InputStream fromClientIS, Channel serverChannel, AutoCloseable onEndOfInput,
328                                    ChannelHandlerContext channelHandlerContext) {
329         super(ClientInputStreamPoolingThread.class.getSimpleName() + " " + session);
330         this.fromClientIS = fromClientIS;
331         this.serverChannel = serverChannel;
332         this.onEndOfInput = onEndOfInput;
333         this.channelHandlerContext = channelHandlerContext;
334     }
335
336     @Override
337     public void run() {
338         ChunkedStream chunkedStream = new ChunkedStream(fromClientIS);
339         try {
340             ByteBuf byteBuf;
341             while ((byteBuf = chunkedStream.readChunk(channelHandlerContext/*only needed for ByteBuf alloc */)) != null) {
342                 serverChannel.writeAndFlush(byteBuf);
343             }
344         } catch (Exception e) {
345             logger.warn("Exception", e);
346         } finally {
347             logger.trace("End of input");
348             // tear down connection
349             try {
350                 onEndOfInput.close();
351             } catch (Exception e) {
352                 logger.warn("Ignoring exception while closing socket", e);
353             }
354         }
355     }
356 }
357
358 /**
359  * Authentication handler for ganymed.
360  * Provides current user name after authenticating using supplied AuthProvider.
361  */
362 @NotThreadSafe
363 class ServerAuthenticationCallbackImpl implements ServerAuthenticationCallback, Supplier<String> {
364     private static final Logger logger = LoggerFactory.getLogger(ServerAuthenticationCallbackImpl.class);
365     private final AuthProvider authProvider;
366     private final String session;
367     private String currentUser;
368
369     ServerAuthenticationCallbackImpl(AuthProvider authProvider, String session) {
370         this.authProvider = authProvider;
371         this.session = session;
372     }
373
374     @Override
375     public String initAuthentication(ServerConnection sc) {
376         logger.trace("{} Established connection", session);
377         return "Established connection" + "\r\n";
378     }
379
380     @Override
381     public String[] getRemainingAuthMethods(ServerConnection sc) {
382         return new String[]{ServerAuthenticationCallback.METHOD_PASSWORD};
383     }
384
385     @Override
386     public AuthenticationResult authenticateWithNone(ServerConnection sc, String username) {
387         return AuthenticationResult.FAILURE;
388     }
389
390     @Override
391     public AuthenticationResult authenticateWithPassword(ServerConnection sc, String username, String password) {
392         checkState(currentUser == null);
393         try {
394             if (authProvider.authenticated(username, password)) {
395                 currentUser = username;
396                 logger.trace("{} user {} authenticated", session, currentUser);
397                 return AuthenticationResult.SUCCESS;
398             }
399         } catch (Exception e) {
400             logger.warn("{} Authentication failed", session, e);
401         }
402         return AuthenticationResult.FAILURE;
403     }
404
405     @Override
406     public AuthenticationResult authenticateWithPublicKey(ServerConnection sc, String username, String algorithm,
407                                                           byte[] publicKey, byte[] signature) {
408         return AuthenticationResult.FAILURE;
409     }
410
411     @Override
412     public String get() {
413         return currentUser;
414     }
415 }