Merge "Bug 1362: New AsyncWriteTransaction#submit method"
[controller.git] / opendaylight / netconf / netconf-ssh / src / main / java / org / opendaylight / controller / netconf / ssh / NetconfSSHServer.java
index 72135cc7dcdd39acddfd8d69d826f2683ee9b921..670f50ddd09f9b3b80d855ce244f4a4cce47da14 100644 (file)
  */
 package org.opendaylight.controller.netconf.ssh;
 
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.local.LocalAddress;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.concurrent.ThreadSafe;
-import org.opendaylight.controller.netconf.ssh.threads.SocketThread;
+import org.opendaylight.controller.netconf.ssh.authentication.AuthProvider;
+import org.opendaylight.controller.netconf.ssh.threads.Handshaker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Thread that accepts client connections. Accepted socket is forwarded to {@link org.opendaylight.controller.netconf.ssh.threads.Handshaker},
+ * which is executed in {@link #handshakeExecutor}.
+ */
 @ThreadSafe
-public class NetconfSSHServer implements Runnable {
+public final class NetconfSSHServer extends Thread implements AutoCloseable {
 
-    private static boolean acceptMore = true;
-    private ServerSocket ss = null;
-    private static final Logger logger =  LoggerFactory.getLogger(NetconfSSHServer.class);
-    private static final AtomicLong sesssionId = new AtomicLong();
-    private final InetSocketAddress clientAddress;
+    private static final Logger logger = LoggerFactory.getLogger(NetconfSSHServer.class);
+    private static final AtomicLong sessionIdCounter = new AtomicLong();
 
-    private NetconfSSHServer(int serverPort,InetSocketAddress clientAddress) throws Exception{
+    private final ServerSocket serverSocket;
+    private final LocalAddress localAddress;
+    private final EventLoopGroup bossGroup;
+    private final AuthProvider authProvider;
+    private final ExecutorService handshakeExecutor;
+    private volatile boolean up;
 
-        logger.trace("Creating SSH server socket on port {}",serverPort);
-        this.ss = new ServerSocket(serverPort);
-        if (!ss.isBound()){
-            throw new Exception("Socket can't be bound to requested port :"+serverPort);
+    private NetconfSSHServer(int serverPort, LocalAddress localAddress, AuthProvider authProvider, EventLoopGroup bossGroup) throws IOException {
+        super(NetconfSSHServer.class.getSimpleName());
+        this.bossGroup = bossGroup;
+        logger.trace("Creating SSH server socket on port {}", serverPort);
+        this.serverSocket = new ServerSocket(serverPort);
+        if (serverSocket.isBound() == false) {
+            throw new IllegalStateException("Socket can't be bound to requested port :" + serverPort);
         }
         logger.trace("Server socket created.");
-        this.clientAddress = clientAddress;
-
+        this.localAddress = localAddress;
+        this.authProvider = authProvider;
+        this.up = true;
+        handshakeExecutor = Executors.newFixedThreadPool(10);
     }
 
-
-    public static NetconfSSHServer start(int serverPort, InetSocketAddress clientAddress) throws Exception {
-        return new NetconfSSHServer(serverPort, clientAddress);
+    public static NetconfSSHServer start(int serverPort, LocalAddress localAddress, AuthProvider authProvider, EventLoopGroup bossGroup) throws IOException {
+        NetconfSSHServer netconfSSHServer = new NetconfSSHServer(serverPort, localAddress, authProvider, bossGroup);
+        netconfSSHServer.start();
+        return netconfSSHServer;
     }
 
-    public void stop() throws Exception {
-        acceptMore = false;
+    @Override
+    public void close() throws IOException {
+        up = false;
         logger.trace("Closing SSH server socket.");
-        ss.close();
+        serverSocket.close();
+        bossGroup.shutdownGracefully();
         logger.trace("SSH server socket closed.");
     }
 
+    @VisibleForTesting
+    public InetSocketAddress getLocalSocketAddress() {
+        return (InetSocketAddress) serverSocket.getLocalSocketAddress();
+    }
+
     @Override
     public void run() {
-        while (acceptMore) {
-            logger.trace("Starting new socket thread.");
+        while (up) {
+            Socket acceptedSocket = null;
             try {
-               SocketThread.start(ss.accept(), clientAddress, sesssionId.incrementAndGet());
+                acceptedSocket = serverSocket.accept();
             } catch (IOException e) {
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                if (up == false) {
+                    logger.trace("Exiting server thread", e);
+                } else {
+                    logger.warn("Exception occurred during socket.accept", e);
+                }
+            }
+            if (acceptedSocket != null) {
+                try {
+                    Handshaker task = new Handshaker(acceptedSocket, localAddress, sessionIdCounter.incrementAndGet(), authProvider, bossGroup);
+                    handshakeExecutor.submit(task);
+                } catch (IOException e) {
+                    logger.warn("Cannot set PEMHostKey, closing connection", e);
+                    try {
+                        acceptedSocket.close();
+                    } catch (IOException e1) {
+                        logger.warn("Ignoring exception while closing socket", e);
+                    }
+                }
             }
         }
+        logger.debug("Server thread is exiting");
     }
 }