Merge "Initial implementation of the ClusteredDataStore"
[controller.git] / third-party / ganymed / src / main / java / ch / ethz / ssh2 / transport / TransportManager.java
1 /*
2  * Copyright (c) 2006-2013 Christian Plattner. All rights reserved.
3  * Please refer to the LICENSE.txt for licensing details.
4  */
5
6 package ch.ethz.ssh2.transport;
7
8 import java.io.IOException;
9 import java.io.InputStream;
10 import java.io.OutputStream;
11 import java.net.InetAddress;
12 import java.net.InetSocketAddress;
13 import java.net.Socket;
14 import java.net.SocketTimeoutException;
15 import java.net.UnknownHostException;
16 import java.security.SecureRandom;
17 import java.util.List;
18 import java.util.Vector;
19
20 import ch.ethz.ssh2.ConnectionInfo;
21 import ch.ethz.ssh2.ConnectionMonitor;
22 import ch.ethz.ssh2.DHGexParameters;
23 import ch.ethz.ssh2.HTTPProxyData;
24 import ch.ethz.ssh2.HTTPProxyException;
25 import ch.ethz.ssh2.ProxyData;
26 import ch.ethz.ssh2.ServerHostKeyVerifier;
27 import ch.ethz.ssh2.crypto.Base64;
28 import ch.ethz.ssh2.crypto.CryptoWishList;
29 import ch.ethz.ssh2.crypto.cipher.BlockCipher;
30 import ch.ethz.ssh2.crypto.digest.MAC;
31 import ch.ethz.ssh2.log.Logger;
32 import ch.ethz.ssh2.packets.PacketDisconnect;
33 import ch.ethz.ssh2.packets.Packets;
34 import ch.ethz.ssh2.packets.TypesReader;
35 import ch.ethz.ssh2.server.ServerConnectionState;
36 import ch.ethz.ssh2.signature.DSAPrivateKey;
37 import ch.ethz.ssh2.signature.RSAPrivateKey;
38 import ch.ethz.ssh2.util.StringEncoder;
39 import ch.ethz.ssh2.util.Tokenizer;
40
41 /*
42  * Yes, the "standard" is a big mess. On one side, the say that arbitary channel
43  * packets are allowed during kex exchange, on the other side we need to blindly
44  * ignore the next _packet_ if the KEX guess was wrong. Where do we know from that
45  * the next packet is not a channel data packet? Yes, we could check if it is in
46  * the KEX range. But the standard says nothing about this. The OpenSSH guys
47  * block local "normal" traffic during KEX. That's fine - however, they assume
48  * that the other side is doing the same. During re-key, if they receive traffic
49  * other than KEX, they become horribly irritated and kill the connection. Since
50  * we are very likely going to communicate with OpenSSH servers, we have to play
51  * the same game - even though we could do better.
52  * 
53  * btw: having stdout and stderr on the same channel, with a shared window, is
54  * also a VERY good idea... =(
55  */
56
57 /**
58  * TransportManager.
59  *
60  * @author Christian Plattner
61  * @version $Id: TransportManager.java 47 2013-07-31 23:59:52Z cleondris@gmail.com $
62  */
63 public class TransportManager
64 {
65     private static final Logger log = Logger.getLogger(TransportManager.class);
66
67     private static class HandlerEntry
68     {
69         MessageHandler mh;
70         int low;
71         int high;
72     }
73
74     private final List<AsynchronousEntry> asynchronousQueue = new Vector<AsynchronousEntry>();
75     private Thread asynchronousThread = null;
76     private boolean asynchronousPending = false;
77
78     class AsynchronousEntry
79     {
80         public byte[] msg;
81         public Runnable run;
82
83         public AsynchronousEntry(byte[] msg, Runnable run)
84         {
85             this.msg = msg;
86             this.run = run;
87         }
88     }
89
90     class AsynchronousWorker extends Thread
91     {
92         @Override
93         public void run()
94         {
95             while (true)
96             {
97                 AsynchronousEntry item = null;
98
99                 synchronized (asynchronousQueue)
100                 {
101                     if (asynchronousQueue.size() == 0)
102                     {
103                                                 /* Only now we may reset the flag, since we are sure that all queued items
104                                                  * have been sent (there is a slight delay between de-queuing and sending,
105                                                  * this is why we need this flag! See code below. Sending takes place outside
106                                                  * of this lock, this is why a test for size()==0 (from another thread) does not ensure
107                                                  * that all messages have been sent.
108                                                  */
109
110                         asynchronousPending = false;
111
112                                                 /* Notify any senders that they can proceed, all async messages have been delivered */
113
114                         asynchronousQueue.notifyAll();
115
116                                                 /* After the queue is empty for about 2 seconds, stop this thread */
117
118                         try
119                         {
120                             asynchronousQueue.wait(2000);
121                         }
122                         catch (InterruptedException ignore)
123                         {
124                         }
125
126                         if (asynchronousQueue.size() == 0)
127                         {
128                             asynchronousThread = null;
129                             return;
130                         }
131                     }
132
133                     item = asynchronousQueue.remove(0);
134                 }
135
136                                 /* The following invocation may throw an IOException.
137                                  * There is no point in handling it - it simply means
138                                  * that the connection has a problem and we should stop
139                                  * sending asynchronously messages. We do not need to signal that
140                                  * we have exited (asynchronousThread = null): further
141                                  * messages in the queue cannot be sent by this or any
142                                  * other thread.
143                                  * Other threads will sooner or later (when receiving or
144                                  * sending the next message) get the same IOException and
145                                  * get to the same conclusion.
146                                  */
147
148                 try
149                 {
150                     sendMessageImmediate(item.msg);
151                 }
152                 catch (IOException e)
153                 {
154                     return;
155                 }
156
157                 if (item.run != null)
158                 {
159                     try
160                     {
161                         item.run.run();
162                     }
163                     catch (Exception ignore)
164                     {
165                     }
166
167                 }
168             }
169         }
170     }
171
172     private Socket sock = new Socket();
173
174     private final Object connectionSemaphore = new Object();
175
176     private boolean flagKexOngoing = false;
177     private boolean connectionClosed = false;
178
179     private Throwable reasonClosedCause = null;
180
181     private TransportConnection tc;
182     private KexManager km;
183
184     private final List<HandlerEntry> messageHandlers = new Vector<HandlerEntry>();
185
186     private Thread receiveThread;
187
188     private List<ConnectionMonitor> connectionMonitors = new Vector<ConnectionMonitor>();
189     private boolean monitorsWereInformed = false;
190
191     /**
192      * There were reports that there are JDKs which use
193      * the resolver even though one supplies a dotted IP
194      * address in the Socket constructor. That is why we
195      * try to generate the InetAdress "by hand".
196      *
197      * @param host
198      * @return the InetAddress
199      * @throws UnknownHostException
200      */
201     private static InetAddress createInetAddress(String host) throws UnknownHostException
202     {
203                 /* Check if it is a dotted IP4 address */
204
205         InetAddress addr = parseIPv4Address(host);
206
207         if (addr != null)
208         {
209             return addr;
210         }
211
212         return InetAddress.getByName(host);
213     }
214
215     private static InetAddress parseIPv4Address(String host) throws UnknownHostException
216     {
217         if (host == null)
218         {
219             return null;
220         }
221
222         String[] quad = Tokenizer.parseTokens(host, '.');
223
224         if ((quad == null) || (quad.length != 4))
225         {
226             return null;
227         }
228
229         byte[] addr = new byte[4];
230
231         for (int i = 0; i < 4; i++)
232         {
233             int part = 0;
234
235             if ((quad[i].length() == 0) || (quad[i].length() > 3))
236             {
237                 return null;
238             }
239
240             for (int k = 0; k < quad[i].length(); k++)
241             {
242                 char c = quad[i].charAt(k);
243
244                                 /* No, Character.isDigit is not the same */
245                 if ((c < '0') || (c > '9'))
246                 {
247                     return null;
248                 }
249
250                 part = part * 10 + (c - '0');
251             }
252
253             if (part > 255) /* 300.1.2.3 is invalid =) */
254             {
255                 return null;
256             }
257
258             addr[i] = (byte) part;
259         }
260
261         return InetAddress.getByAddress(host, addr);
262     }
263
264     public int getPacketOverheadEstimate()
265     {
266         return tc.getPacketOverheadEstimate();
267     }
268
269     public void setTcpNoDelay(boolean state) throws IOException
270     {
271         sock.setTcpNoDelay(state);
272     }
273
274     public void setSoTimeout(int timeout) throws IOException
275     {
276         sock.setSoTimeout(timeout);
277     }
278
279     public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException
280     {
281         return km.getOrWaitForConnectionInfo(kexNumber);
282     }
283
284     public Throwable getReasonClosedCause()
285     {
286         synchronized (connectionSemaphore)
287         {
288             return reasonClosedCause;
289         }
290     }
291
292     public byte[] getSessionIdentifier()
293     {
294         return km.sessionId;
295     }
296
297     public void close(Throwable cause, boolean useDisconnectPacket)
298     {
299         if (useDisconnectPacket == false)
300         {
301                         /* OK, hard shutdown - do not aquire the semaphore,
302                          * perhaps somebody is inside (and waits until the remote
303                          * side is ready to accept new data). */
304
305             try
306             {
307                 sock.close();
308             }
309             catch (IOException ignore)
310             {
311             }
312
313                         /* OK, whoever tried to send data, should now agree that
314                          * there is no point in further waiting =)
315                          * It is safe now to aquire the semaphore.
316                          */
317         }
318
319         synchronized (connectionSemaphore)
320         {
321             if (connectionClosed == false)
322             {
323                 if (useDisconnectPacket == true)
324                 {
325                     try
326                     {
327                         byte[] msg = new PacketDisconnect(Packets.SSH_DISCONNECT_BY_APPLICATION, cause.getMessage(), "")
328                                 .getPayload();
329                         if (tc != null)
330                         {
331                             tc.sendMessage(msg);
332                         }
333                     }
334                     catch (IOException ignore)
335                     {
336                     }
337
338                     try
339                     {
340                         sock.close();
341                     }
342                     catch (IOException ignore)
343                     {
344                     }
345                 }
346
347                 connectionClosed = true;
348                 reasonClosedCause = cause; /* may be null */
349             }
350             connectionSemaphore.notifyAll();
351         }
352
353                 /* No check if we need to inform the monitors */
354
355         List<ConnectionMonitor> monitors = new Vector<ConnectionMonitor>();
356
357         synchronized (this)
358         {
359                         /* Short term lock to protect "connectionMonitors"
360                          * and "monitorsWereInformed"
361                          * (they may be modified concurrently)
362                          */
363
364             if (monitorsWereInformed == false)
365             {
366                 monitorsWereInformed = true;
367                 monitors.addAll(connectionMonitors);
368             }
369         }
370
371         for (ConnectionMonitor cmon : monitors)
372         {
373             try
374             {
375                 cmon.connectionLost(reasonClosedCause);
376             }
377             catch (Exception ignore)
378             {
379             }
380         }
381     }
382
383     private static Socket establishConnection(String hostname, int port, ProxyData proxyData, int connectTimeout)
384             throws IOException
385     {
386                 /* See the comment for createInetAddress() */
387
388         if (proxyData == null)
389         {
390             InetAddress addr = createInetAddress(hostname);
391             Socket s = new Socket();
392             s.connect(new InetSocketAddress(addr, port), connectTimeout);
393             return s;
394         }
395
396         if (proxyData instanceof HTTPProxyData)
397         {
398             HTTPProxyData pd = (HTTPProxyData) proxyData;
399
400                         /* At the moment, we only support HTTP proxies */
401
402             InetAddress addr = createInetAddress(pd.proxyHost);
403             Socket s = new Socket();
404             s.connect(new InetSocketAddress(addr, pd.proxyPort), connectTimeout);
405
406                         /* OK, now tell the proxy where we actually want to connect to */
407
408             StringBuilder sb = new StringBuilder();
409
410             sb.append("CONNECT ");
411             sb.append(hostname);
412             sb.append(':');
413             sb.append(port);
414             sb.append(" HTTP/1.0\r\n");
415
416             if ((pd.proxyUser != null) && (pd.proxyPass != null))
417             {
418                 String credentials = pd.proxyUser + ":" + pd.proxyPass;
419                 char[] encoded = Base64.encode(StringEncoder.GetBytes(credentials));
420                 sb.append("Proxy-Authorization: Basic ");
421                 sb.append(encoded);
422                 sb.append("\r\n");
423             }
424
425             if (pd.requestHeaderLines != null)
426             {
427                 for (int i = 0; i < pd.requestHeaderLines.length; i++)
428                 {
429                     if (pd.requestHeaderLines[i] != null)
430                     {
431                         sb.append(pd.requestHeaderLines[i]);
432                         sb.append("\r\n");
433                     }
434                 }
435             }
436
437             sb.append("\r\n");
438
439             OutputStream out = s.getOutputStream();
440
441             out.write(StringEncoder.GetBytes(sb.toString()));
442             out.flush();
443
444                         /* Now parse the HTTP response */
445
446             byte[] buffer = new byte[1024];
447             InputStream in = s.getInputStream();
448
449             int len = ClientServerHello.readLineRN(in, buffer);
450
451             String httpReponse = StringEncoder.GetString(buffer, 0, len);
452
453             if (httpReponse.startsWith("HTTP/") == false)
454             {
455                 throw new IOException("The proxy did not send back a valid HTTP response.");
456             }
457
458                         /* "HTTP/1.X XYZ X" => 14 characters minimum */
459
460             if ((httpReponse.length() < 14) || (httpReponse.charAt(8) != ' ') || (httpReponse.charAt(12) != ' '))
461             {
462                 throw new IOException("The proxy did not send back a valid HTTP response.");
463             }
464
465             int errorCode = 0;
466
467             try
468             {
469                 errorCode = Integer.parseInt(httpReponse.substring(9, 12));
470             }
471             catch (NumberFormatException ignore)
472             {
473                 throw new IOException("The proxy did not send back a valid HTTP response.");
474             }
475
476             if ((errorCode < 0) || (errorCode > 999))
477             {
478                 throw new IOException("The proxy did not send back a valid HTTP response.");
479             }
480
481             if (errorCode != 200)
482             {
483                 throw new HTTPProxyException(httpReponse.substring(13), errorCode);
484             }
485
486                         /* OK, read until empty line */
487
488             while (true)
489             {
490                 len = ClientServerHello.readLineRN(in, buffer);
491                 if (len == 0)
492                 {
493                     break;
494                 }
495             }
496             return s;
497         }
498
499         throw new IOException("Unsupported ProxyData");
500     }
501
502     private void startReceiver() throws IOException
503     {
504         receiveThread = new Thread(new Runnable()
505         {
506             public void run()
507             {
508                 try
509                 {
510                     receiveLoop();
511                 }
512                 catch (Exception e)
513                 {
514                     close(e, false);
515
516                     log.warning("Receive thread: error in receiveLoop: " + e.getMessage());
517                 }
518
519                 if (log.isDebugEnabled())
520                 {
521                     log.debug("Receive thread: back from receiveLoop");
522                 }
523
524                                 /* Tell all handlers that it is time to say goodbye */
525
526                 if (km != null)
527                 {
528                     try
529                     {
530                         km.handleMessage(null, 0);
531                     }
532                     catch (IOException ignored)
533                     {
534                     }
535                 }
536
537                 for (HandlerEntry he : messageHandlers)
538                 {
539                     try
540                     {
541                         he.mh.handleMessage(null, 0);
542                     }
543                     catch (Exception ignore)
544                     {
545                     }
546                 }
547             }
548         });
549
550         receiveThread.setDaemon(true);
551         receiveThread.start();
552     }
553
554     public void clientInit(Socket socket, String softwareversion, CryptoWishList cwl,
555                            ServerHostKeyVerifier verifier, DHGexParameters dhgex, SecureRandom rnd) throws IOException
556     {
557                 /* First, establish the TCP connection to the SSH-2 server */
558
559         sock = socket;
560
561                 /* Parse the server line and say hello - important: this information is later needed for the
562                  * key exchange (to stop man-in-the-middle attacks) - that is why we wrap it into an object
563                  * for later use.
564                  */
565
566         ClientServerHello csh = ClientServerHello.clientHello(softwareversion, sock.getInputStream(),
567                 sock.getOutputStream());
568
569         tc = new TransportConnection(sock.getInputStream(), sock.getOutputStream(), rnd);
570         String hostname = sock.getInetAddress().getHostName();
571         int port = sock.getPort();
572
573         km = new ClientKexManager(this, csh, cwl, hostname, port, verifier, rnd);
574         km.initiateKEX(cwl, dhgex, null, null);
575
576         startReceiver();
577     }
578
579     public void clientInit(String hostname, int port, String softwareversion, CryptoWishList cwl,
580                            ServerHostKeyVerifier verifier, DHGexParameters dhgex, int connectTimeout, SecureRandom rnd,
581                            ProxyData proxyData) throws IOException
582     {
583                 /* First, establish the TCP connection to the SSH-2 server */
584
585         sock = establishConnection(hostname, port, proxyData, connectTimeout);
586
587                 /* Parse the server line and say hello - important: this information is later needed for the
588                  * key exchange (to stop man-in-the-middle attacks) - that is why we wrap it into an object
589                  * for later use.
590                  */
591
592         ClientServerHello csh = ClientServerHello.clientHello(softwareversion, sock.getInputStream(),
593                 sock.getOutputStream());
594
595         tc = new TransportConnection(sock.getInputStream(), sock.getOutputStream(), rnd);
596
597         km = new ClientKexManager(this, csh, cwl, hostname, port, verifier, rnd);
598         km.initiateKEX(cwl, dhgex, null, null);
599
600         startReceiver();
601     }
602
603     public void serverInit(ServerConnectionState state) throws IOException
604     {
605                 /* TCP connection is already established */
606
607         this.sock = state.s;
608
609                 /* Parse the client line and say hello - important: this information is later needed for the
610                  * key exchange (to stop man-in-the-middle attacks) - that is why we wrap it into an object
611                  * for later use.
612                  */
613
614         state.csh = ClientServerHello.serverHello(state.softwareversion, sock.getInputStream(), sock.getOutputStream());
615
616         tc = new TransportConnection(sock.getInputStream(), sock.getOutputStream(), state.generator);
617
618         km = new ServerKexManager(state);
619         km.initiateKEX(state.next_cryptoWishList, null, state.next_dsa_key, state.next_rsa_key);
620
621         startReceiver();
622     }
623
624     public void registerMessageHandler(MessageHandler mh, int low, int high)
625     {
626         HandlerEntry he = new HandlerEntry();
627         he.mh = mh;
628         he.low = low;
629         he.high = high;
630
631         synchronized (messageHandlers)
632         {
633             messageHandlers.add(he);
634         }
635     }
636
637     public void removeMessageHandler(MessageHandler mh, int low, int high)
638     {
639         synchronized (messageHandlers)
640         {
641             for (int i = 0; i < messageHandlers.size(); i++)
642             {
643                 HandlerEntry he = messageHandlers.get(i);
644                 if ((he.mh == mh) && (he.low == low) && (he.high == high))
645                 {
646                     messageHandlers.remove(i);
647                     break;
648                 }
649             }
650         }
651     }
652
653     public void sendKexMessage(byte[] msg) throws IOException
654     {
655         synchronized (connectionSemaphore)
656         {
657             if (connectionClosed)
658             {
659                 throw (IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause);
660             }
661
662             flagKexOngoing = true;
663
664             try
665             {
666                 tc.sendMessage(msg);
667             }
668             catch (IOException e)
669             {
670                 close(e, false);
671                 throw e;
672             }
673         }
674     }
675
676     public void kexFinished() throws IOException
677     {
678         synchronized (connectionSemaphore)
679         {
680             flagKexOngoing = false;
681             connectionSemaphore.notifyAll();
682         }
683     }
684
685     /**
686      *
687      * @param cwl
688      * @param dhgex
689      * @param dsa may be null if this is a client connection
690      * @param rsa may be null if this is a client connection
691      * @throws IOException
692      */
693     public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex, DSAPrivateKey dsa, RSAPrivateKey rsa)
694             throws IOException
695     {
696         synchronized (connectionSemaphore)
697         {
698             if (connectionClosed)
699                                 /* Inform the caller that there is no point in triggering a new kex */
700                 throw (IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause);
701         }
702
703         km.initiateKEX(cwl, dhgex, dsa, rsa);
704     }
705
706     public void changeRecvCipher(BlockCipher bc, MAC mac)
707     {
708         tc.changeRecvCipher(bc, mac);
709     }
710
711     public void changeSendCipher(BlockCipher bc, MAC mac)
712     {
713         tc.changeSendCipher(bc, mac);
714     }
715
716     public void sendAsynchronousMessage(byte[] msg) throws IOException
717     {
718         sendAsynchronousMessage(msg, null);
719     }
720
721     public void sendAsynchronousMessage(byte[] msg, Runnable run) throws IOException
722     {
723         synchronized (asynchronousQueue)
724         {
725             asynchronousQueue.add(new AsynchronousEntry(msg, run));
726             asynchronousPending = true;
727
728                         /* This limit should be flexible enough. We need this, otherwise the peer
729                          * can flood us with global requests (and other stuff where we have to reply
730                          * with an asynchronous message) and (if the server just sends data and does not
731                          * read what we send) this will probably put us in a low memory situation
732                          * (our send queue would grow and grow and...) */
733
734             if (asynchronousQueue.size() > 100)
735             {
736                 throw new IOException("Error: the peer is not consuming our asynchronous replies.");
737             }
738
739                         /* Check if we have an asynchronous sending thread */
740
741             if (asynchronousThread == null)
742             {
743                 asynchronousThread = new AsynchronousWorker();
744                 asynchronousThread.setDaemon(true);
745                 asynchronousThread.start();
746
747                                 /* The thread will stop after 2 seconds of inactivity (i.e., empty queue) */
748             }
749
750             asynchronousQueue.notifyAll();
751         }
752     }
753
754     public void setConnectionMonitors(List<ConnectionMonitor> monitors)
755     {
756         synchronized (this)
757         {
758             connectionMonitors = new Vector<ConnectionMonitor>();
759             connectionMonitors.addAll(monitors);
760         }
761     }
762
763     /**
764      * True if no response message expected.
765      */
766     private boolean idle;
767
768     /**
769      * Send a message but ensure that all queued messages are being sent first.
770      *
771      * @param msg
772      * @throws IOException
773      */
774     public void sendMessage(byte[] msg) throws IOException
775     {
776         synchronized (asynchronousQueue)
777         {
778             while (asynchronousPending)
779             {
780                 try
781                 {
782                     asynchronousQueue.wait(1000);
783                 }
784                 catch (InterruptedException e)
785                 {
786                 }
787             }
788         }
789
790         sendMessageImmediate(msg);
791     }
792
793     /**
794      * Send message, ignore queued async messages that have not been delivered yet.
795      * Will be called directly from the asynchronousThread thread.
796      *
797      * @param msg
798      * @throws IOException
799      */
800     public void sendMessageImmediate(byte[] msg) throws IOException
801     {
802         if (Thread.currentThread() == receiveThread)
803         {
804             throw new IOException("Assertion error: sendMessage may never be invoked by the receiver thread!");
805         }
806
807         boolean wasInterrupted = false;
808
809         try
810         {
811             synchronized (connectionSemaphore)
812             {
813                 while (true)
814                 {
815                     if (connectionClosed)
816                     {
817                         throw (IOException) new IOException("Sorry, this connection is closed.")
818                                 .initCause(reasonClosedCause);
819                     }
820
821                     if (flagKexOngoing == false)
822                     {
823                         break;
824                     }
825
826                     try
827                     {
828                         connectionSemaphore.wait();
829                     }
830                     catch (InterruptedException e)
831                     {
832                         wasInterrupted = true;
833                     }
834                 }
835
836                 try
837                 {
838                     tc.sendMessage(msg);
839                     idle = false;
840                 }
841                 catch (IOException e)
842                 {
843                     close(e, false);
844                     throw e;
845                 }
846             }
847         }
848         finally
849         {
850             if (wasInterrupted)
851                 Thread.currentThread().interrupt();
852         }
853     }
854
855     public void receiveLoop() throws IOException
856     {
857         byte[] msg = new byte[35000];
858
859         while (true)
860         {
861             int msglen;
862             try
863             {
864                 msglen = tc.receiveMessage(msg, 0, msg.length);
865             }
866             catch (SocketTimeoutException e)
867             {
868                 // Timeout in read
869                 if (idle)
870                 {
871                     log.debug("Ignoring socket timeout");
872                     continue;
873                 }
874                 throw e;
875             }
876             idle = true;
877
878             int type = msg[0] & 0xff;
879
880             if (type == Packets.SSH_MSG_IGNORE)
881             {
882                 continue;
883             }
884
885             if (type == Packets.SSH_MSG_DEBUG)
886             {
887                 if (log.isDebugEnabled())
888                 {
889                     TypesReader tr = new TypesReader(msg, 0, msglen);
890                     tr.readByte();
891                     tr.readBoolean();
892                     StringBuilder debugMessageBuffer = new StringBuilder();
893                     debugMessageBuffer.append(tr.readString("UTF-8"));
894
895                     for (int i = 0; i < debugMessageBuffer.length(); i++)
896                     {
897                         char c = debugMessageBuffer.charAt(i);
898
899                         if ((c >= 32) && (c <= 126))
900                         {
901                             continue;
902                         }
903                         debugMessageBuffer.setCharAt(i, '\uFFFD');
904                     }
905
906                     log.debug("DEBUG Message from remote: '" + debugMessageBuffer.toString() + "'");
907                 }
908                 continue;
909             }
910
911             if (type == Packets.SSH_MSG_UNIMPLEMENTED)
912             {
913                 throw new IOException("Peer sent UNIMPLEMENTED message, that should not happen.");
914             }
915
916             if (type == Packets.SSH_MSG_DISCONNECT)
917             {
918                 TypesReader tr = new TypesReader(msg, 0, msglen);
919                 tr.readByte();
920                 int reason_code = tr.readUINT32();
921                 StringBuilder reasonBuffer = new StringBuilder();
922                 reasonBuffer.append(tr.readString("UTF-8"));
923
924                                 /*
925                                  * Do not get fooled by servers that send abnormal long error
926                                  * messages
927                                  */
928
929                 if (reasonBuffer.length() > 255)
930                 {
931                     reasonBuffer.setLength(255);
932                     reasonBuffer.setCharAt(254, '.');
933                     reasonBuffer.setCharAt(253, '.');
934                     reasonBuffer.setCharAt(252, '.');
935                 }
936
937                                 /*
938                                  * Also, check that the server did not send characters that may
939                                  * screw up the receiver -> restrict to reasonable US-ASCII
940                                  * subset -> "printable characters" (ASCII 32 - 126). Replace
941                                  * all others with 0xFFFD (UNICODE replacement character).
942                                  */
943
944                 for (int i = 0; i < reasonBuffer.length(); i++)
945                 {
946                     char c = reasonBuffer.charAt(i);
947
948                     if ((c >= 32) && (c <= 126))
949                     {
950                         continue;
951                     }
952                     reasonBuffer.setCharAt(i, '\uFFFD');
953                 }
954
955                 throw new IOException("Peer sent DISCONNECT message (reason code " + reason_code + "): "
956                         + reasonBuffer.toString());
957             }
958
959                         /*
960                          * Is it a KEX Packet?
961                          */
962
963             if ((type == Packets.SSH_MSG_KEXINIT) || (type == Packets.SSH_MSG_NEWKEYS)
964                     || ((type >= 30) && (type <= 49)))
965             {
966                 km.handleMessage(msg, msglen);
967                 continue;
968             }
969
970             MessageHandler mh = null;
971
972             for (int i = 0; i < messageHandlers.size(); i++)
973             {
974                 HandlerEntry he = messageHandlers.get(i);
975                 if ((he.low <= type) && (type <= he.high))
976                 {
977                     mh = he.mh;
978                     break;
979                 }
980             }
981
982             if (mh == null)
983             {
984                 throw new IOException("Unexpected SSH message (type " + type + ")");
985             }
986
987             mh.handleMessage(msg, msglen);
988         }
989     }
990 }