2 * Copyright (c) 2006-2013 Christian Plattner. All rights reserved.
3 * Please refer to the LICENSE.txt for licensing details.
6 package ch.ethz.ssh2.transport;
8 import java.io.IOException;
9 import java.io.InputStream;
10 import java.io.OutputStream;
11 import java.net.InetAddress;
12 import java.net.InetSocketAddress;
13 import java.net.Socket;
14 import java.net.SocketTimeoutException;
15 import java.net.UnknownHostException;
16 import java.security.SecureRandom;
17 import java.util.List;
18 import java.util.Vector;
20 import ch.ethz.ssh2.ConnectionInfo;
21 import ch.ethz.ssh2.ConnectionMonitor;
22 import ch.ethz.ssh2.DHGexParameters;
23 import ch.ethz.ssh2.HTTPProxyData;
24 import ch.ethz.ssh2.HTTPProxyException;
25 import ch.ethz.ssh2.ProxyData;
26 import ch.ethz.ssh2.ServerHostKeyVerifier;
27 import ch.ethz.ssh2.crypto.Base64;
28 import ch.ethz.ssh2.crypto.CryptoWishList;
29 import ch.ethz.ssh2.crypto.cipher.BlockCipher;
30 import ch.ethz.ssh2.crypto.digest.MAC;
31 import ch.ethz.ssh2.log.Logger;
32 import ch.ethz.ssh2.packets.PacketDisconnect;
33 import ch.ethz.ssh2.packets.Packets;
34 import ch.ethz.ssh2.packets.TypesReader;
35 import ch.ethz.ssh2.server.ServerConnectionState;
36 import ch.ethz.ssh2.signature.DSAPrivateKey;
37 import ch.ethz.ssh2.signature.RSAPrivateKey;
38 import ch.ethz.ssh2.util.StringEncoder;
39 import ch.ethz.ssh2.util.Tokenizer;
42 * Yes, the "standard" is a big mess. On one side, the say that arbitary channel
43 * packets are allowed during kex exchange, on the other side we need to blindly
44 * ignore the next _packet_ if the KEX guess was wrong. Where do we know from that
45 * the next packet is not a channel data packet? Yes, we could check if it is in
46 * the KEX range. But the standard says nothing about this. The OpenSSH guys
47 * block local "normal" traffic during KEX. That's fine - however, they assume
48 * that the other side is doing the same. During re-key, if they receive traffic
49 * other than KEX, they become horribly irritated and kill the connection. Since
50 * we are very likely going to communicate with OpenSSH servers, we have to play
51 * the same game - even though we could do better.
53 * btw: having stdout and stderr on the same channel, with a shared window, is
54 * also a VERY good idea... =(
60 * @author Christian Plattner
61 * @version $Id: TransportManager.java 47 2013-07-31 23:59:52Z cleondris@gmail.com $
63 public class TransportManager
65 private static final Logger log = Logger.getLogger(TransportManager.class);
67 private static class HandlerEntry
74 private final List<AsynchronousEntry> asynchronousQueue = new Vector<AsynchronousEntry>();
75 private Thread asynchronousThread = null;
76 private boolean asynchronousPending = false;
78 class AsynchronousEntry
83 public AsynchronousEntry(byte[] msg, Runnable run)
90 class AsynchronousWorker extends Thread
97 AsynchronousEntry item = null;
99 synchronized (asynchronousQueue)
101 if (asynchronousQueue.size() == 0)
103 /* Only now we may reset the flag, since we are sure that all queued items
104 * have been sent (there is a slight delay between de-queuing and sending,
105 * this is why we need this flag! See code below. Sending takes place outside
106 * of this lock, this is why a test for size()==0 (from another thread) does not ensure
107 * that all messages have been sent.
110 asynchronousPending = false;
112 /* Notify any senders that they can proceed, all async messages have been delivered */
114 asynchronousQueue.notifyAll();
116 /* After the queue is empty for about 2 seconds, stop this thread */
120 asynchronousQueue.wait(2000);
122 catch (InterruptedException ignore)
126 if (asynchronousQueue.size() == 0)
128 asynchronousThread = null;
133 item = asynchronousQueue.remove(0);
136 /* The following invocation may throw an IOException.
137 * There is no point in handling it - it simply means
138 * that the connection has a problem and we should stop
139 * sending asynchronously messages. We do not need to signal that
140 * we have exited (asynchronousThread = null): further
141 * messages in the queue cannot be sent by this or any
143 * Other threads will sooner or later (when receiving or
144 * sending the next message) get the same IOException and
145 * get to the same conclusion.
150 sendMessageImmediate(item.msg);
152 catch (IOException e)
157 if (item.run != null)
163 catch (Exception ignore)
172 private Socket sock = new Socket();
174 private final Object connectionSemaphore = new Object();
176 private boolean flagKexOngoing = false;
177 private boolean connectionClosed = false;
179 private Throwable reasonClosedCause = null;
181 private TransportConnection tc;
182 private KexManager km;
184 private final List<HandlerEntry> messageHandlers = new Vector<HandlerEntry>();
186 private Thread receiveThread;
188 private List<ConnectionMonitor> connectionMonitors = new Vector<ConnectionMonitor>();
189 private boolean monitorsWereInformed = false;
192 * There were reports that there are JDKs which use
193 * the resolver even though one supplies a dotted IP
194 * address in the Socket constructor. That is why we
195 * try to generate the InetAdress "by hand".
198 * @return the InetAddress
199 * @throws UnknownHostException
201 private static InetAddress createInetAddress(String host) throws UnknownHostException
203 /* Check if it is a dotted IP4 address */
205 InetAddress addr = parseIPv4Address(host);
212 return InetAddress.getByName(host);
215 private static InetAddress parseIPv4Address(String host) throws UnknownHostException
222 String[] quad = Tokenizer.parseTokens(host, '.');
224 if ((quad == null) || (quad.length != 4))
229 byte[] addr = new byte[4];
231 for (int i = 0; i < 4; i++)
235 if ((quad[i].length() == 0) || (quad[i].length() > 3))
240 for (int k = 0; k < quad[i].length(); k++)
242 char c = quad[i].charAt(k);
244 /* No, Character.isDigit is not the same */
245 if ((c < '0') || (c > '9'))
250 part = part * 10 + (c - '0');
253 if (part > 255) /* 300.1.2.3 is invalid =) */
258 addr[i] = (byte) part;
261 return InetAddress.getByAddress(host, addr);
264 public int getPacketOverheadEstimate()
266 return tc.getPacketOverheadEstimate();
269 public void setTcpNoDelay(boolean state) throws IOException
271 sock.setTcpNoDelay(state);
274 public void setSoTimeout(int timeout) throws IOException
276 sock.setSoTimeout(timeout);
279 public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException
281 return km.getOrWaitForConnectionInfo(kexNumber);
284 public Throwable getReasonClosedCause()
286 synchronized (connectionSemaphore)
288 return reasonClosedCause;
292 public byte[] getSessionIdentifier()
297 public void close(Throwable cause, boolean useDisconnectPacket)
299 if (useDisconnectPacket == false)
301 /* OK, hard shutdown - do not aquire the semaphore,
302 * perhaps somebody is inside (and waits until the remote
303 * side is ready to accept new data). */
309 catch (IOException ignore)
313 /* OK, whoever tried to send data, should now agree that
314 * there is no point in further waiting =)
315 * It is safe now to aquire the semaphore.
319 synchronized (connectionSemaphore)
321 if (connectionClosed == false)
323 if (useDisconnectPacket == true)
327 byte[] msg = new PacketDisconnect(Packets.SSH_DISCONNECT_BY_APPLICATION, cause.getMessage(), "")
334 catch (IOException ignore)
342 catch (IOException ignore)
347 connectionClosed = true;
348 reasonClosedCause = cause; /* may be null */
350 connectionSemaphore.notifyAll();
353 /* No check if we need to inform the monitors */
355 List<ConnectionMonitor> monitors = new Vector<ConnectionMonitor>();
359 /* Short term lock to protect "connectionMonitors"
360 * and "monitorsWereInformed"
361 * (they may be modified concurrently)
364 if (monitorsWereInformed == false)
366 monitorsWereInformed = true;
367 monitors.addAll(connectionMonitors);
371 for (ConnectionMonitor cmon : monitors)
375 cmon.connectionLost(reasonClosedCause);
377 catch (Exception ignore)
383 private static Socket establishConnection(String hostname, int port, ProxyData proxyData, int connectTimeout)
386 /* See the comment for createInetAddress() */
388 if (proxyData == null)
390 InetAddress addr = createInetAddress(hostname);
391 Socket s = new Socket();
392 s.connect(new InetSocketAddress(addr, port), connectTimeout);
396 if (proxyData instanceof HTTPProxyData)
398 HTTPProxyData pd = (HTTPProxyData) proxyData;
400 /* At the moment, we only support HTTP proxies */
402 InetAddress addr = createInetAddress(pd.proxyHost);
403 Socket s = new Socket();
404 s.connect(new InetSocketAddress(addr, pd.proxyPort), connectTimeout);
406 /* OK, now tell the proxy where we actually want to connect to */
408 StringBuilder sb = new StringBuilder();
410 sb.append("CONNECT ");
414 sb.append(" HTTP/1.0\r\n");
416 if ((pd.proxyUser != null) && (pd.proxyPass != null))
418 String credentials = pd.proxyUser + ":" + pd.proxyPass;
419 char[] encoded = Base64.encode(StringEncoder.GetBytes(credentials));
420 sb.append("Proxy-Authorization: Basic ");
425 if (pd.requestHeaderLines != null)
427 for (int i = 0; i < pd.requestHeaderLines.length; i++)
429 if (pd.requestHeaderLines[i] != null)
431 sb.append(pd.requestHeaderLines[i]);
439 OutputStream out = s.getOutputStream();
441 out.write(StringEncoder.GetBytes(sb.toString()));
444 /* Now parse the HTTP response */
446 byte[] buffer = new byte[1024];
447 InputStream in = s.getInputStream();
449 int len = ClientServerHello.readLineRN(in, buffer);
451 String httpReponse = StringEncoder.GetString(buffer, 0, len);
453 if (httpReponse.startsWith("HTTP/") == false)
455 throw new IOException("The proxy did not send back a valid HTTP response.");
458 /* "HTTP/1.X XYZ X" => 14 characters minimum */
460 if ((httpReponse.length() < 14) || (httpReponse.charAt(8) != ' ') || (httpReponse.charAt(12) != ' '))
462 throw new IOException("The proxy did not send back a valid HTTP response.");
469 errorCode = Integer.parseInt(httpReponse.substring(9, 12));
471 catch (NumberFormatException ignore)
473 throw new IOException("The proxy did not send back a valid HTTP response.");
476 if ((errorCode < 0) || (errorCode > 999))
478 throw new IOException("The proxy did not send back a valid HTTP response.");
481 if (errorCode != 200)
483 throw new HTTPProxyException(httpReponse.substring(13), errorCode);
486 /* OK, read until empty line */
490 len = ClientServerHello.readLineRN(in, buffer);
499 throw new IOException("Unsupported ProxyData");
502 private void startReceiver() throws IOException
504 receiveThread = new Thread(new Runnable()
516 log.warning("Receive thread: error in receiveLoop: " + e.getMessage());
519 if (log.isDebugEnabled())
521 log.debug("Receive thread: back from receiveLoop");
524 /* Tell all handlers that it is time to say goodbye */
530 km.handleMessage(null, 0);
532 catch (IOException ignored)
537 for (HandlerEntry he : messageHandlers)
541 he.mh.handleMessage(null, 0);
543 catch (Exception ignore)
550 receiveThread.setDaemon(true);
551 receiveThread.start();
554 public void clientInit(String hostname, int port, String softwareversion, CryptoWishList cwl,
555 ServerHostKeyVerifier verifier, DHGexParameters dhgex, int connectTimeout, SecureRandom rnd,
556 ProxyData proxyData) throws IOException
558 /* First, establish the TCP connection to the SSH-2 server */
560 sock = establishConnection(hostname, port, proxyData, connectTimeout);
562 /* Parse the server line and say hello - important: this information is later needed for the
563 * key exchange (to stop man-in-the-middle attacks) - that is why we wrap it into an object
567 ClientServerHello csh = ClientServerHello.clientHello(softwareversion, sock.getInputStream(),
568 sock.getOutputStream());
570 tc = new TransportConnection(sock.getInputStream(), sock.getOutputStream(), rnd);
572 km = new ClientKexManager(this, csh, cwl, hostname, port, verifier, rnd);
573 km.initiateKEX(cwl, dhgex, null, null);
578 public void serverInit(ServerConnectionState state) throws IOException
580 /* TCP connection is already established */
584 /* Parse the client line and say hello - important: this information is later needed for the
585 * key exchange (to stop man-in-the-middle attacks) - that is why we wrap it into an object
589 state.csh = ClientServerHello.serverHello(state.softwareversion, sock.getInputStream(), sock.getOutputStream());
591 tc = new TransportConnection(sock.getInputStream(), sock.getOutputStream(), state.generator);
593 km = new ServerKexManager(state);
594 km.initiateKEX(state.next_cryptoWishList, null, state.next_dsa_key, state.next_rsa_key);
599 public void registerMessageHandler(MessageHandler mh, int low, int high)
601 HandlerEntry he = new HandlerEntry();
606 synchronized (messageHandlers)
608 messageHandlers.add(he);
612 public void removeMessageHandler(MessageHandler mh, int low, int high)
614 synchronized (messageHandlers)
616 for (int i = 0; i < messageHandlers.size(); i++)
618 HandlerEntry he = messageHandlers.get(i);
619 if ((he.mh == mh) && (he.low == low) && (he.high == high))
621 messageHandlers.remove(i);
628 public void sendKexMessage(byte[] msg) throws IOException
630 synchronized (connectionSemaphore)
632 if (connectionClosed)
634 throw (IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause);
637 flagKexOngoing = true;
643 catch (IOException e)
651 public void kexFinished() throws IOException
653 synchronized (connectionSemaphore)
655 flagKexOngoing = false;
656 connectionSemaphore.notifyAll();
664 * @param dsa may be null if this is a client connection
665 * @param rsa may be null if this is a client connection
666 * @throws IOException
668 public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex, DSAPrivateKey dsa, RSAPrivateKey rsa)
671 synchronized (connectionSemaphore)
673 if (connectionClosed)
674 /* Inform the caller that there is no point in triggering a new kex */
675 throw (IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause);
678 km.initiateKEX(cwl, dhgex, dsa, rsa);
681 public void changeRecvCipher(BlockCipher bc, MAC mac)
683 tc.changeRecvCipher(bc, mac);
686 public void changeSendCipher(BlockCipher bc, MAC mac)
688 tc.changeSendCipher(bc, mac);
691 public void sendAsynchronousMessage(byte[] msg) throws IOException
693 sendAsynchronousMessage(msg, null);
696 public void sendAsynchronousMessage(byte[] msg, Runnable run) throws IOException
698 synchronized (asynchronousQueue)
700 asynchronousQueue.add(new AsynchronousEntry(msg, run));
701 asynchronousPending = true;
703 /* This limit should be flexible enough. We need this, otherwise the peer
704 * can flood us with global requests (and other stuff where we have to reply
705 * with an asynchronous message) and (if the server just sends data and does not
706 * read what we send) this will probably put us in a low memory situation
707 * (our send queue would grow and grow and...) */
709 if (asynchronousQueue.size() > 100)
711 throw new IOException("Error: the peer is not consuming our asynchronous replies.");
714 /* Check if we have an asynchronous sending thread */
716 if (asynchronousThread == null)
718 asynchronousThread = new AsynchronousWorker();
719 asynchronousThread.setDaemon(true);
720 asynchronousThread.start();
722 /* The thread will stop after 2 seconds of inactivity (i.e., empty queue) */
725 asynchronousQueue.notifyAll();
729 public void setConnectionMonitors(List<ConnectionMonitor> monitors)
733 connectionMonitors = new Vector<ConnectionMonitor>();
734 connectionMonitors.addAll(monitors);
739 * True if no response message expected.
741 private boolean idle;
744 * Send a message but ensure that all queued messages are being sent first.
747 * @throws IOException
749 public void sendMessage(byte[] msg) throws IOException
751 synchronized (asynchronousQueue)
753 while (asynchronousPending)
757 asynchronousQueue.wait(1000);
759 catch (InterruptedException e)
765 sendMessageImmediate(msg);
769 * Send message, ignore queued async messages that have not been delivered yet.
770 * Will be called directly from the asynchronousThread thread.
773 * @throws IOException
775 public void sendMessageImmediate(byte[] msg) throws IOException
777 if (Thread.currentThread() == receiveThread)
779 throw new IOException("Assertion error: sendMessage may never be invoked by the receiver thread!");
782 boolean wasInterrupted = false;
786 synchronized (connectionSemaphore)
790 if (connectionClosed)
792 throw (IOException) new IOException("Sorry, this connection is closed.")
793 .initCause(reasonClosedCause);
796 if (flagKexOngoing == false)
803 connectionSemaphore.wait();
805 catch (InterruptedException e)
807 wasInterrupted = true;
816 catch (IOException e)
826 Thread.currentThread().interrupt();
830 public void receiveLoop() throws IOException
832 byte[] msg = new byte[35000];
839 msglen = tc.receiveMessage(msg, 0, msg.length);
841 catch (SocketTimeoutException e)
846 log.debug("Ignoring socket timeout");
853 int type = msg[0] & 0xff;
855 if (type == Packets.SSH_MSG_IGNORE)
860 if (type == Packets.SSH_MSG_DEBUG)
862 if (log.isDebugEnabled())
864 TypesReader tr = new TypesReader(msg, 0, msglen);
867 StringBuilder debugMessageBuffer = new StringBuilder();
868 debugMessageBuffer.append(tr.readString("UTF-8"));
870 for (int i = 0; i < debugMessageBuffer.length(); i++)
872 char c = debugMessageBuffer.charAt(i);
874 if ((c >= 32) && (c <= 126))
878 debugMessageBuffer.setCharAt(i, '\uFFFD');
881 log.debug("DEBUG Message from remote: '" + debugMessageBuffer.toString() + "'");
886 if (type == Packets.SSH_MSG_UNIMPLEMENTED)
888 throw new IOException("Peer sent UNIMPLEMENTED message, that should not happen.");
891 if (type == Packets.SSH_MSG_DISCONNECT)
893 TypesReader tr = new TypesReader(msg, 0, msglen);
895 int reason_code = tr.readUINT32();
896 StringBuilder reasonBuffer = new StringBuilder();
897 reasonBuffer.append(tr.readString("UTF-8"));
900 * Do not get fooled by servers that send abnormal long error
904 if (reasonBuffer.length() > 255)
906 reasonBuffer.setLength(255);
907 reasonBuffer.setCharAt(254, '.');
908 reasonBuffer.setCharAt(253, '.');
909 reasonBuffer.setCharAt(252, '.');
913 * Also, check that the server did not send characters that may
914 * screw up the receiver -> restrict to reasonable US-ASCII
915 * subset -> "printable characters" (ASCII 32 - 126). Replace
916 * all others with 0xFFFD (UNICODE replacement character).
919 for (int i = 0; i < reasonBuffer.length(); i++)
921 char c = reasonBuffer.charAt(i);
923 if ((c >= 32) && (c <= 126))
927 reasonBuffer.setCharAt(i, '\uFFFD');
930 throw new IOException("Peer sent DISCONNECT message (reason code " + reason_code + "): "
931 + reasonBuffer.toString());
935 * Is it a KEX Packet?
938 if ((type == Packets.SSH_MSG_KEXINIT) || (type == Packets.SSH_MSG_NEWKEYS)
939 || ((type >= 30) && (type <= 49)))
941 km.handleMessage(msg, msglen);
945 MessageHandler mh = null;
947 for (int i = 0; i < messageHandlers.size(); i++)
949 HandlerEntry he = messageHandlers.get(i);
950 if ((he.low <= type) && (type <= he.high))
959 throw new IOException("Unexpected SSH message (type " + type + ")");
962 mh.handleMessage(msg, msglen);