Initial framework migration to netty.
[bgpcep.git] / framework / src / main / java / org / opendaylight / protocol / framework / DispatcherImpl.java
index ed5168d047f2dfca23679857f9cf0e9dd620a2a9..e05c9e378376340f9d950be3e7488e498397fef5 100644 (file)
  */
 package org.opendaylight.protocol.framework;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Future;
+
 import java.io.IOException;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.Set;
 import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import javax.net.ssl.SSLContext;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-
 /**
  * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
  * start method that will handle sockets in different thread.
  */
 public final class DispatcherImpl implements Dispatcher, SessionParent {
 
-       private static final Logger logger = LoggerFactory.getLogger(Dispatcher.class);
-
-       public static final int DEFAULT_MAX_RECONNECT_COUNT = 30;
-
-       public static final int DEFAULT_RECONNECT_MILLIS = 30000;
-       public static final int DEFAULT_SERVICE_MILLIS = 1000;
-
-       private static final int BUFFER_SIZE = 16384;
-
-       private static final boolean SSL_ENABLED = true;
-
-       private int serviceMillis = DEFAULT_SERVICE_MILLIS;
-       private int reconnectMillis = 5000;
-
-       private int maxConnectCount = 0;
-
-       /**
-        * List of servers created by this dispatcher. Servers are identified as a pair Server and the InetSocketAddress to
-        * which the server is bound.
-        */
-       private final Map<InetSocketAddress, ProtocolServer> servers = new HashMap<InetSocketAddress, ProtocolServer>();
-
-       /**
-        * Mapping of client Sessions to keys (Either clients created by the dispatcher directly or clients connected to one
-        * of the dispatchers server).
-        */
-       private final Map<ProtocolSession, SelectionKey> sessionKeys = new HashMap<ProtocolSession, SelectionKey>();
-
-       /**
-        * List of clients created by this dispatcher. Each client has its own Session. They are identified as a pair of
-        * Session and the InetSocketAddress to which they are connected.
-        */
-       private final BiMap<InetSocketAddress, ProtocolSession> clients;
-
-       /**
-        * Timer object grouping FSM Timers
-        */
-       private final Timer stateTimer;
-
-       /**
-        * Variable indicating that there was a request for stopping this dispatcher.
-        */
-       private volatile boolean requestStop = false;
-
-       private final Thread innerThread;
-
-       private final ExecutorService executorService;
-
-       private final InnerRun innerRun;
-
-       /**
-        * Configuration dependency used for testing of reusability.
-        */
-       private final ThreadFactory threadFactory;
-
-       private final class InnerRun implements Runnable {
-
-               /**
-                * Common selector for client/server parts.
-                */
-               public final Selector selector;
-
-               private final DispatcherImpl parent;
+       final class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
 
-               protected InnerRun(final DispatcherImpl parent) throws IOException {
-                       final Selector s = SelectorProvider.provider().openSelector();
+               private final ProtocolServer server;
 
-                       if (SSL_ENABLED)
-                               this.selector = new SSLSelector(s);
-                       else
-                               this.selector = s;
-
-                       this.parent = parent;
+               public ServerChannelInitializer(final ProtocolServer server) {
+                       this.server = server;
                }
 
                @Override
-               public void run() {
-                       // this method finishes only when stop() method was called
-                       while (!this.parent.requestStop) {
-                               try {
-                                       this.selector.select();
-                               } catch (final IOException e) {
-                                       logger.warn("Selection operation failed", e);
-                                       break;
-                               }
-
-                               /*
-                                * This block runs under lock. The idea is that
-                                * selection key notifiers will first acquire the
-                                * lock, then wake up the selector, then do their
-                                * modifications.
-                                *
-                                * This means that there are two possibilities:
-                                *
-                                * 1) we arrive here as a result of a selector wake
-                                *    up, at which point the modifier already holds
-                                *    the lock, and we'll wait for it.
-                                *
-                                * 2) we arrive here as a result of an event, in which
-                                *    case we will prevent modifiers from starting
-                                *    by holding the lock.
-                                */
-                               // logger.debug("Acquiring lock");
-                               synchronized (this) {
-                                       final Set<SelectionKey> keys = this.selector.selectedKeys();
-                                       if (keys.isEmpty())
-                                               continue;
-
-                                       /*
-                                        * Calculate maximum nanoseconds we can spend on read
-                                        * or write. Each key can do a pair of operations in one
-                                        * iteration.
-                                        */
-                                       final long serviceTime = serviceMillis * 500000 / keys.size();
-
-                                       final Iterator<SelectionKey> selectedKeys = keys.iterator();
-                                       while (selectedKeys.hasNext()) {
-                                               final SelectionKey key = selectedKeys.next();
-                                               selectedKeys.remove();
-
-                                               if (!key.isValid()) {
-                                                       continue;
-                                               }
-
-                                               try {
-                                                       if (key.isAcceptable()) {
-                                                               this.parent.accept(key);
-                                                       }
-                                                       if (key.isConnectable()) {
-                                                               if (!this.parent.finishConnection(key)) {
-                                                                       continue;
-                                                               }
-                                                       }
-
-                                                       /*
-                                                        * Split read/write fairness. If this key is only
-                                                        * readable or only writable, double the time
-                                                        */
-                                                       final long keyTime = key.isReadable() == key.isWritable() ? serviceTime : 2 * serviceTime;
-
-                                                       /*
-                                                        * If this key is readable, read it. That operation may
-                                                        * detect end-of-stream, which it will report internally,
-                                                        * and inform us by returning true.
-                                                        *
-                                                        * If that is the case, we do not want to proceed with the
-                                                        * write case, because the key may no longer be valid.
-                                                        */
-                                                       if (key.isReadable() && this.parent.read(key, System.nanoTime() + keyTime)) {
-                                                               continue;
-                                                       }
-
-                                                       /*
-                                                        * If this key is writable, write it. This may completely
-                                                        * drain the output queue, in which case write returns true.
-                                                        *
-                                                        * If that is the case, we need to suspend selecting for
-                                                        * writability -- it will be re-enabled once the queue goes
-                                                        * non-empty.
-                                                        */
-                                                       if (key.isWritable() && this.parent.write(key, System.nanoTime() + keyTime) && key.isValid()) {
-                                                               key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-                                                       }
-
-                                               } catch (final IOException e) {
-                                                       logger.debug("Channel {} incurred unexpected error, closing it", key.channel(), e);
-                                                       key.cancel();
-                                                       try {
-                                                               key.channel().close(); // close the channel that caused problems
-                                                       } catch (final IOException e1) {
-                                                               logger.error("Channel: {} could not be closed, because {}", key.channel(), e1.getMessage(), e1);
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-
-                       logger.trace("Ended run of dispatcher.");
-                       try {
-                               this.selector.close();
-                       } catch (final IOException e) {
-                               throw new RuntimeException("Failed to close selector", e);
-                       }
-               }
-       }
-
-       /**
-        * Creates an instance of Dispatcher, gets the default selector and opens it.
-        *
-        * @param tfactory default Thread Factory
-        * @throws IOException if some error occurred during opening the selector
-        */
-       public DispatcherImpl(final ThreadFactory tfactory) throws IOException {
-               this.threadFactory = tfactory;
-               this.executorService = Executors.newSingleThreadExecutor(tfactory);
-               this.stateTimer = new Timer();
-               this.clients = HashBiMap.create();
-               this.innerRun = new InnerRun(this);
-               this.innerThread = tfactory.newThread(this.innerRun);
-               this.innerThread.start();
-       }
-
-       protected synchronized ProtocolServer startServer(final ServerSocketChannel serverChannel, final InetSocketAddress address,
-                       final ProtocolConnectionFactory connectionFactory, final ProtocolSessionFactory sfactory,
-                       final ProtocolInputStreamFactory isFactory) throws IOException {
-
-               // Notify the thread to update its selection keys
-               this.innerRun.selector.wakeup();
-
-               // logger.debug("Selector notified.");
-               serverChannel.configureBlocking(false);
-               serverChannel.bind(address);
-
-               final SelectionKey key = serverChannel.register(this.innerRun.selector, SelectionKey.OP_ACCEPT);
-               final ProtocolServer server = new ProtocolServer(this, address, serverChannel, connectionFactory, sfactory, isFactory);
-               key.attach(server);
-               this.servers.put(address, server);
-
-               logger.info("Server created.");
-               return server;
-       }
-
-       @Override
-       public ProtocolServer createServer(final InetSocketAddress address, final ProtocolConnectionFactory connectionFactory,
-                       final ProtocolSessionFactory sfactory, final ProtocolInputStreamFactory isFactory) throws IOException {
-               synchronized (this.innerRun) {
-                       if (this.servers.get(address) != null) {
-                               logger.warn("Server with this address: {} was already created.", address);
-                               throw new IllegalStateException("Server with this address: " + address + " was already created.");
-                       }
-
-                       return this.startServer(ServerSocketChannel.open(), address, connectionFactory, sfactory, isFactory);
-               }
-       }
-
-       @Override
-       public ProtocolServer createServer(final InetSocketAddress address, final ProtocolConnectionFactory connectionFactory,
-                       final ProtocolSessionFactory sfactory, final ProtocolInputStreamFactory isFactory, final SSLContext context) throws IOException {
-
-               if (!SSL_ENABLED)
-                       throw new UnsupportedOperationException("SSL has not been enabled");
-
-               synchronized (this.innerRun) {
-                       if (this.servers.get(address) != null) {
-                               logger.warn("Server with this address: {} was already created.", address);
-                               throw new IllegalStateException("Server with this address: " + address + " was already created.");
-                       }
-
-                       return this.startServer(SSLServerSocketChannel.open(this.innerRun.selector, context, this.executorService), address,
-                                       connectionFactory, sfactory, isFactory);
-               }
-       }
-
-       private void connectChannel(final SelectionKey key) {
-               final SessionStreams state = (SessionStreams) key.attachment();
-               state.timer = null;
-
-               state.connectCount++;
-               logger.debug("Connecting to {} attempt {}", state.connection.getPeerAddress(), state.connectCount);
-
-               final SocketChannel channel = (SocketChannel) key.channel();
-               try {
-                       channel.connect(state.connection.getPeerAddress());
-               } catch (final IOException e) {
-                       this.connectFailed(key, e);
-                       return;
-               }
-
-               if (channel.isConnected()) {
-                       logger.trace("Connected, update interestops");
-                       key.interestOps(SelectionKey.OP_READ);
-                       state.getSession().startSession();
-               } else
-                       key.interestOps(SelectionKey.OP_CONNECT);
-       }
-
-       private void connectFailed(final SelectionKey key, final IOException e) {
-               final SessionStreams state = (SessionStreams) key.attachment();
-
-               key.interestOps(0);
-
-               if (this.maxConnectCount >= 0 && state.connectCount >= this.maxConnectCount) {
-                       logger.debug("Connection to {} failed", state.connection.getPeerAddress().getAddress(), e);
-                       this.clients.inverse().remove(state.getSession());
-                       state.getSession().onConnectionFailed(e);
-                       return;
+               protected void initChannel(final SocketChannel ch) throws Exception {
+                       final ProtocolSession session = this.server.createSession(DispatcherImpl.this.stateTimer, ch);
+                       ch.pipeline().addLast(DispatcherImpl.this.handlerFactory.getHandlers(session));
                }
 
-               logger.trace("Connect to {} failed, will retry in {} milliseconds", state.connection.getPeerAddress().getAddress(),
-                               this.reconnectMillis, e);
-               state.timer = new TimerTask() {
-                       @Override
-                       public void run() {
-                               DispatcherImpl.this.connectChannel(key);
-                       }
-               };
-               this.stateTimer.schedule(state.timer, this.reconnectMillis);
        }
 
-       private ProtocolSession startClient(final SocketChannel channel, final ProtocolConnection connection,
-                       final ProtocolSessionFactory sfactory, final ProtocolInputStreamFactory isFactory) throws IOException {
+       final class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
 
-               // Notify the thread to update its selection keys
-               this.innerRun.selector.wakeup();
+               private final ProtocolSessionFactory sfactory;
 
-               channel.configureBlocking(false);
+               private final ProtocolConnection connection;
 
-               final ProtocolSession session;
-               final SelectionKey key;
-               synchronized (this) {
-                       session = sfactory.getProtocolSession(this, this.stateTimer, connection, 0);
+               private ProtocolSession session;
 
-                       final PipedOutputStream pos = new PipedOutputStream();
-                       final PipedInputStream pis = new PipedInputStream(pos, session.maximumMessageSize());
-
-                       key = channel.register(this.innerRun.selector, SelectionKey.OP_CONNECT);
-                       key.attach(new SessionStreams(pos, pis, isFactory.getProtocolInputStream(pis, session.getMessageFactory()), session, connection));
-
-                       this.sessionKeys.put(session, key);
-                       this.clients.put(connection.getPeerAddress(), session);
-                       logger.info("Client created.");
-               }
-
-               this.connectChannel(key);
-               return session;
-       }
-
-       @Override
-       public ProtocolSession createClient(final ProtocolConnection connection, final ProtocolSessionFactory sfactory,
-                       final ProtocolInputStreamFactory isFactory) throws IOException {
-               synchronized (this.innerRun) {
-                       if (this.clients.containsKey(connection.getPeerAddress())) {
-                               logger.warn("Attempt to create duplicate client session to the same address: {}", connection.getPeerAddress());
-                               throw new IllegalStateException("Attempt to create duplicate client session to the same address: "
-                                               + connection.getPeerAddress());
-                       }
-
-                       return this.startClient(SocketChannel.open(), connection, sfactory, isFactory);
+               public ClientChannelInitializer(final ProtocolConnection connection, final ProtocolSessionFactory sfactory) {
+                       this.connection = connection;
+                       this.sfactory = sfactory;
                }
-       }
 
-       @Override
-       public ProtocolSession createClient(final ProtocolConnection connection, final ProtocolSessionFactory sfactory,
-                       final ProtocolInputStreamFactory isFactory, final SSLContext context) throws IOException {
-
-               if (!SSL_ENABLED)
-                       throw new UnsupportedOperationException("SSL has not been enabled");
-
-               synchronized (this.innerRun) {
-                       if (this.clients.containsKey(connection.getPeerAddress())) {
-                               logger.warn("Attempt to create duplicate client session to the same address: {}", connection.getPeerAddress());
-                               throw new IllegalStateException("Attempt to create duplicate client session to the same address: "
-                                               + connection.getPeerAddress());
-                       }
-
-                       final SocketChannel sock = SSLSocketChannel.open(SocketChannel.open(), context, this.executorService, null);
-                       return this.startClient(sock, connection, sfactory, isFactory);
+               @Override
+               protected void initChannel(final SocketChannel ch) throws Exception {
+                       this.session = this.sfactory.getProtocolSession(DispatcherImpl.this, DispatcherImpl.this.stateTimer, this.connection, 0,
+                                       ch.pipeline().context(ProtocolSessionOutboundHandler.class));
+                       ch.pipeline().addLast(DispatcherImpl.this.handlerFactory.getHandlers(this.session));
                }
-       }
 
-       /**
-        * Requests to stop dispatchers run() method. This method wakes up the selector, even if there are no selectedKeys
-        * to stop blocking the thread.
-        */
-       public void stop() {
-               logger.debug("Requested stop of the Dispatcher.");
-               this.requestStop = true;
-               this.innerRun.selector.wakeup();
-               try {
-                       this.innerThread.join();
-               } catch (final InterruptedException e) {
-                       logger.error("Stopping interrupted.", e);
+               public ProtocolSession getSession() {
+                       return this.session;
                }
 
-               this.executorService.shutdown();
-       }
-
-       /**
-        * Removes given server from list of servers created by this dispatcher.
-        *
-        * @param server to be removed
-        */
-       void removeServer(final ProtocolServer server) {
-               this.servers.remove(server.getAddress());
-               logger.trace("Server removed.");
        }
 
-       /**
-        * Reads from socket and sends data to session through Piped Streams.
-        *
-        * @param key selection key that was marked as ready to read from
-        * @return true if the read has encountered end of channel (so no data will ever come) false if the method did read
-        *         all of its input
-        * @throws IOException if there was some error with IO streams
-        */
-       private boolean read(final SelectionKey key, final long deadline) throws IOException {
-               logger.trace("Started reading.");
-               final SocketChannel chan = (SocketChannel) key.channel();
-               final SessionStreams streams = (SessionStreams) key.attachment();
-               final ProtocolInputStream pcepis = streams.getProtocolInputStream();
-               final PipedOutputStream pos = streams.getPipedOutputStream();
-               final PipedInputStream pis = streams.getPipedInputStream();
-               final ProtocolSession session = streams.getSession();
-
-               try {
-                       final ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
-                       int r = chan.read(byteBuffer);
-
-                       while (r != 0) {
-                               byteBuffer.flip();
-
-                               // if we have some unread data in the buffer
-                               while (byteBuffer.hasRemaining()) {
-                                       final int pisFree = session.maximumMessageSize() - pis.available();
-                                       if (pisFree == 0)
-                                               throw new IOException("Protocol failed to detect no-progress situation");
-
-                                       int toMove = byteBuffer.remaining();
-
-                                       // Do not try to write more than the input stream can accept
-                                       if (toMove > pisFree)
-                                               toMove = pisFree;
-
-                                       // Write to the output stream and adjust buffer position
-                                       pos.write(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), toMove);
-                                       byteBuffer.position(byteBuffer.position() + toMove);
-
-                                       // Notify input stream that it can read more stuff
-                                       pos.flush();
-
-                                       // process any messages which became available
-                                       while (pcepis.isMessageAvailable()) {
-                                               // read and parse message
-                                               final ProtocolMessage msg = pcepis.getMessage();
-                                               // send it to session for handling
-                                               session.handleMessage(msg);
-                                       }
-                               }
-                               byteBuffer.clear();
-
-                               /*
-                                * We reached end-of-input stream. Notify close the output stream
-                                * and notify the user. He is then supposed to close the session,
-                                * releasing the write-end of things.
-                                */
-                               if (r == -1) {
-                                       logger.warn("End of input stream reached.");
-                                       /*
-                                        * The input stream has some bytes, but no others are coming
-                                        * in. This means it should have been a complete message,
-                                        * but is not -> that's a malformed message.
-                                        */
-                                       if (pis.available() != 0) {
-                                               logger.warn("Received incomplete message.");
-                                               throw new DeserializerException("Incomplete message at the end of input stream");
-                                       }
-                                       key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
-                                       session.endOfInput();
-                                       return true;
-                               }
-
-                               if (!chan.isOpen())
-                                       return true;
-
-                               final long now = System.nanoTime();
-                               if (deadline <= now) {
-                                       logger.trace("Read service time exceeded by {} nanoseconds.", now - deadline);
-                                       break;
-                               }
-
-                               r = chan.read(byteBuffer);
-                       }
-               } catch (final DeserializerException e) {
-                       // An unrecoverable malformed message has been received. Notify
-                       // session to take care of the fallout.
-                       logger.warn("Malformed message {}", e.getMessage(), e);
-                       session.handleMalformedMessage(e);
-               } catch (final DocumentedException e) {
-                       // A potentially recoverable malformed message has been received.
-                       // Push it to the session, it will take care of the details.
-                       logger.warn("Malformed message {}", e.getMessage(), e);
-                       session.handleMalformedMessage(e);
-               } catch (final RuntimeException e) {
-                       logger.error("Unrecoverable internal session error: {}", e.getMessage(), e);
-                       throw new IOException("Unrecoverable internal session error", e);
-               }
-               return false;
-       }
+       final class ProtocolSessionPromise extends DefaultPromise<ProtocolSession> {
+               private final ChannelFuture cf;
 
-       /**
-        * Writes data from ProtocolOutputStream to socket.
-        *
-        * @param key selection key that was marked as ready to write from
-        * @return false if the writing was not successful true if the queue of messages became empty
-        * @throws IOException if there was some error with the IO streams
-        */
-       private boolean write(final SelectionKey key, final long deadline) throws IOException {
-               logger.trace("Started writing.");
-
-               // TODO: promote to hard error?
-               final SocketChannel socketChannel = (SocketChannel) key.channel();
-               if (!socketChannel.isConnected()) {
-                       logger.warn("Channel is not connected yet.");
-                       return false;
+               ProtocolSessionPromise(final ChannelFuture cf) {
+                       super();
+                       this.cf = cf;
                }
 
-               final SessionStreams streams = (SessionStreams) key.attachment();
-               final Queue<ByteBuffer> queue = streams.getSession().getStream().getBuffers();
-
-               synchronized (queue) {
-                       logger.trace("Synchronized writing started.");
-                       // Write until there's not more data
-                       while (!queue.isEmpty()) {
-                               final ByteBuffer buf = queue.element();
-                               socketChannel.write(buf);
-                               if (buf.remaining() > 0) {
-                                       /*
-                                        * If there is not enough space in the socket to write all the data
-                                        * stay in writing mode and attempt to write after the next select()
-                                        * call
-                                        */
-                                       logger.trace("Socket queue full.");
-                                       return false;
-                               }
-                               queue.remove();
-
-                               final long now = System.nanoTime();
-                               if (deadline <= now) {
-                                       logger.trace("Write service time exceeded by {} nanoseconds.", now - deadline);
-                                       return false;
-                               }
-                       }
-                       logger.trace("Write queue empty.");
-                       return true;
+               @Override
+               public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+                       this.cf.cancel(mayInterruptIfRunning);
+                       return super.cancel(mayInterruptIfRunning);
                }
        }
 
-       private void acceptChannel(final ProtocolServer server, final SocketChannel socketChannel, final InetSocketAddress clientAddress)
-                       throws IOException {
-               socketChannel.configureBlocking(false);
+       private static final Logger logger = LoggerFactory.getLogger(DispatcherImpl.class);
 
-               final ProtocolSession s = server.createSession(this.stateTimer, clientAddress);
-               final PipedOutputStream pos = new PipedOutputStream();
-               final PipedInputStream pis = new PipedInputStream(pos, s.maximumMessageSize());
-               final ProtocolInputStream inputStream = server.createInputStream(pis, s.getMessageFactory());
+       private final EventLoopGroup bossGroup;
 
-               final SelectionKey skey = socketChannel.register(this.innerRun.selector, SelectionKey.OP_READ);
-               skey.attach(new SessionStreams(pos, pis, inputStream, s, null));
-               this.sessionKeys.put(s, skey);
-
-               // FIXME: catch RuntimeExceptions here, undo the put/attach above?
-               // or can we move the .put() after this call?
-               s.startSession();
-       }
+       private final EventLoopGroup workerGroup;
 
        /**
-        * Accepts incoming connection from a client to one of the running servers.
-        *
-        * @param key selection key that was marked as ready to accept connections
-        * @throws IOException if there was some error with IO streams
+        * Timer object grouping FSM Timers
         */
-       private void accept(final SelectionKey key) throws IOException {
-               final ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
-               final SocketChannel socketChannel = serverSocketChannel.accept();
-               if (socketChannel == null)
-                       return;
-
-               final InetSocketAddress clientAddress = (InetSocketAddress) socketChannel.getRemoteAddress();
-               logger.info("Requested connection for: {}", clientAddress.getAddress().getHostAddress());
-
-               try {
-                       this.acceptChannel((ProtocolServer) key.attachment(), socketChannel, clientAddress);
-               } catch (final Exception e) {
-                       logger.warn("Failed to start protocol session", e);
-                       socketChannel.close();
-               }
-       }
+       private final Timer stateTimer;
 
-       /**
-        * Finishes connection of the client to the server. Starts session.
-        *
-        * @param key selection key that was marked as ready to finish connection
-        */
-       private boolean finishConnection(final SelectionKey key) {
-               final SocketChannel socketChannel = (SocketChannel) key.channel();
-               final SessionStreams streams = (SessionStreams) key.attachment();
-               logger.trace("Finishing connection for key {}", key);
-               try {
-                       if (socketChannel.finishConnect()) {
-                               key.interestOps(SelectionKey.OP_READ);
-                               streams.getSession().startSession();
-                       }
-               } catch (final IOException e) {
-                       this.connectFailed(key, e);
-                       return false;
-               }
-               return true;
-       }
+       private final ProtocolHandlerFactory handlerFactory;
 
-       /**
-        * Closes channel and cancels key assigned to given session.
-        *
-        * @param session session that was closed
-        */
-       void closeSessionSockets(final ProtocolSession session) {
-               synchronized (this.innerRun) {
-                       logger.debug("Trying to close sesion.");
-                       final SelectionKey key = this.sessionKeys.get(session);
-                       if (key != null) {
-
-                               try {
-                                       key.channel().close();
-                               } catch (final IOException e) {
-                                       logger.error("Session channel could not be closed.");
-                               } finally {
-                                       final SessionStreams streams = (SessionStreams) key.attachment();
-                                       if (streams.timer != null) {
-                                               streams.timer.cancel();
-                                               streams.timer = null;
-                                       }
-
-                                       logger.trace("Cancelling key.");
-                                       key.cancel();
-
-                                       final PipedOutputStream pos = streams.getPipedOutputStream();
-                                       try {
-                                               pos.close();
-                                       } catch (final IOException e) {
-                                               logger.error("Session-internal output stream could not be closed.");
-                                       } finally {
-                                               final PipedInputStream pis = streams.getPipedInputStream();
-                                               try {
-                                                       pis.close();
-                                               } catch (final IOException e) {
-                                                       logger.error("Session-internal input stream could not be closed.");
-                                               }
-                                       }
-                               }
-                       }
-                       this.sessionKeys.remove(key);
-                       logger.debug("Session sockets closed.");
-               }
+       public DispatcherImpl(final ProtocolMessageFactory factory) {
+               this.bossGroup = new NioEventLoopGroup();
+               this.workerGroup = new NioEventLoopGroup();
+               this.stateTimer = new Timer();
+               this.handlerFactory = new ProtocolHandlerFactory(factory);
        }
 
        @Override
-       public void onSessionClosed(final ProtocolSession session) {
-               synchronized (this.innerRun) {
-                       this.innerRun.selector.wakeup();
-                       this.closeSessionSockets(session);
-                       this.clients.inverse().remove(session);
-                       logger.debug("Session {} removed.", session);
-               }
+       public ProtocolServer createServer(final InetSocketAddress address, final ProtocolConnectionFactory connectionFactory,
+                       final ProtocolSessionFactory sessionFactory) {
+               final ProtocolServer server = new ProtocolServer(address, connectionFactory, sessionFactory);
+               final ServerBootstrap b = new ServerBootstrap();
+               b.group(this.bossGroup, this.workerGroup);
+               b.channel(NioServerSocketChannel.class);
+               b.option(ChannelOption.SO_BACKLOG, 128);
+               b.childHandler(new ServerChannelInitializer(server));
+               b.childOption(ChannelOption.SO_KEEPALIVE, true);
+
+               // Bind and start to accept incoming connections.
+               final ChannelFuture f = b.bind(address);
+               // b.localAddress(address);
+               logger.debug("Server {} created.", server);
+               return server;
        }
 
        @Override
-       public void checkOutputBuffer(final ProtocolSession session) {
-               final SelectionKey key = this.sessionKeys.get(session);
-               key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-               key.selector().wakeup();
+       public Future<ProtocolSession> createClient(final ProtocolConnection connection, final ProtocolSessionFactory sfactory) {
+               final Bootstrap b = new Bootstrap();
+               b.group(this.workerGroup);
+               b.channel(NioSocketChannel.class);
+               b.option(ChannelOption.SO_KEEPALIVE, true);
+               final ClientChannelInitializer init = new ClientChannelInitializer(connection, sfactory);
+               b.handler(init);
+               final ChannelFuture f = b.connect(connection.getPeerAddress());
+               final ProtocolSessionPromise p = new ProtocolSessionPromise(f);
+
+               f.addListener(new ChannelFutureListener() {
+                       @Override
+                       public void operationComplete(final ChannelFuture cf) {
+                               if (cf.isSuccess()) {
+                                       p.setSuccess(init.getSession());
+                                       return;
+                               } else if (cf.isCancelled()) {
+                                       p.cancel(false);
+                               } else
+                                       p.setFailure(cf.cause());
+                       }
+               });
+               logger.debug("Client created.");
+               return p;
        }
 
        @Override
        public void close() throws IOException {
-               for (final Entry<InetSocketAddress, ProtocolServer> s : this.servers.entrySet()) {
-                       s.getValue().close();
-               }
-               for (final Entry<InetSocketAddress, ProtocolSession> s : this.clients.entrySet()) {
-                       s.getValue().close();
-               }
-       }
-
-       /**
-        * Gets milliseconds between reconnects.
-        * @return time in milliseconds between reconnects
-        */
-       public synchronized int getReconnectMillis() {
-               return this.reconnectMillis;
+               this.workerGroup.shutdownGracefully();
+               this.bossGroup.shutdownGracefully();
        }
 
-       /**
-        * Sets milliseconds between reconnects.
-        * @param reconnectMillis new value
-        */
-       public synchronized void setReconnectMillis(final int reconnectMillis) {
-               Preconditions.checkArgument(reconnectMillis > 0, "Reconnect milliseconds value has to be positive");
-               this.reconnectMillis = reconnectMillis;
-               // FIXME: readjust all pending timers
-       }
-
-       /**
-        * Gets maximum tries for connection.
-        * @return max connection count
-        */
-       public synchronized int getMaxConnectCount() {
-               return this.maxConnectCount;
-       }
-
-       /**
-        * Sets maximum tries for connection.
-        * @param maxConnectCount new value
-        */
-       public synchronized void setMaxConnectCount(final int maxConnectCount) {
-               this.maxConnectCount = maxConnectCount;
-               // FIXME: purge all sessions which already exceed the limit
-       }
-
-       public synchronized int getServiceMillis() {
-               return this.serviceMillis;
-       }
-
-       public synchronized void setServiceMillis(final int serviceMillis) {
-               Preconditions.checkArgument(serviceMillis > 0);
-               this.serviceMillis = serviceMillis;
-       }
-
-       /**
-        * Gets thread factory.
-        * @return thread factory
-        */
-       public ThreadFactory getThreadFactory() {
-               return this.threadFactory;
+       @Override
+       public void onSessionClosed(final ProtocolSession session) {
+               // TODO Auto-generated method stub
        }
 }