BUG-58: refactor to take advantage of netty
[bgpcep.git] / framework / src / main / java / org / opendaylight / protocol / framework / DispatcherImpl.java
index e513dd7566da2ecc8aa9120250df20e957ee21e5..a6019d0da1322679e299ed3ec2a0036a7d73b0c2 100644 (file)
@@ -7,95 +7,29 @@
  */
 package org.opendaylight.protocol.framework;
 
-import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.GlobalEventExecutor;
 
-import java.io.IOException;
+import java.io.Closeable;
 import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Timer;
-
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 
 /**
  * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
  * start method that will handle sockets in different thread.
  */
-public final class DispatcherImpl implements Dispatcher, SessionParent {
-
-       final class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
-
-               private final ProtocolServer server;
-
-               private ProtocolSession session;
-
-               public ServerChannelInitializer(final ProtocolServer server) {
-                       this.server = server;
-               }
-
-               @Override
-               protected void initChannel(final SocketChannel ch) throws Exception {
-                       final ProtocolHandlerFactory factory = new ProtocolHandlerFactory(DispatcherImpl.this.messageFactory);
-                       ch.pipeline().addFirst("decoder", factory.getDecoder());
-                       this.session = this.server.createSession(DispatcherImpl.this.stateTimer, ch);
-
-                       ch.pipeline().addAfter("decoder", "inbound", factory.getSessionInboundHandler(this.session));
-                       ch.pipeline().addAfter("inbound", "encoder", factory.getEncoder());
-               }
-
-               public ProtocolSession getSession() {
-                       return this.session;
-               }
-
-       }
-
-       final class ClientChannelInitializer<T extends ProtocolSession> extends ChannelInitializer<SocketChannel> {
-
-               private final ProtocolSessionFactory<T> sfactory;
-
-               private final ProtocolConnection connection;
-
-               private T session;
-
-               public ClientChannelInitializer(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory) {
-                       this.connection = connection;
-                       this.sfactory = sfactory;
-               }
-
-               @Override
-               protected void initChannel(final SocketChannel ch) throws Exception {
-                       final ProtocolHandlerFactory factory = new ProtocolHandlerFactory(DispatcherImpl.this.messageFactory);
-                       ch.pipeline().addFirst("decoder", factory.getDecoder());
-                       this.session = this.sfactory.getProtocolSession(DispatcherImpl.this, DispatcherImpl.this.stateTimer, this.connection, 0, ch);
-                       ch.pipeline().addAfter("decoder", "inbound", factory.getSessionInboundHandler(this.session));
-                       ch.pipeline().addAfter("inbound", "encoder", factory.getEncoder());
-               }
-
-               T getSession() {
-                       return this.session;
-               }
-       }
+public final class DispatcherImpl implements Closeable, Dispatcher {
 
        private static final Logger logger = LoggerFactory.getLogger(DispatcherImpl.class);
 
@@ -103,214 +37,74 @@ public final class DispatcherImpl implements Dispatcher, SessionParent {
 
        private final EventLoopGroup workerGroup;
 
-       /**
-        * Timer object grouping FSM Timers
-        */
-       private final Timer stateTimer;
-
-       private final ProtocolMessageFactory messageFactory;
-
-       private final Map<ProtocolServer, Channel> serverSessions;
-
-       private final Map<ProtocolSession, Channel> clientSessions;
-
-       public DispatcherImpl(final ProtocolMessageFactory factory) {
+       public DispatcherImpl() {
+               // FIXME: we should get these as arguments
                this.bossGroup = new NioEventLoopGroup();
                this.workerGroup = new NioEventLoopGroup();
-               this.stateTimer = new Timer();
-               this.messageFactory = factory;
-               this.clientSessions = Maps.newHashMap();
-               this.serverSessions = Maps.newHashMap();
        }
 
        @Override
-       public Future<ProtocolServer> createServer(final InetSocketAddress address, final ProtocolConnectionFactory connectionFactory,
-                       final ProtocolSessionFactory<?> sessionFactory) {
-               final ProtocolServer server = new ProtocolServer(address, connectionFactory, sessionFactory, this);
+       public <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> ChannelFuture createServer(
+                       final InetSocketAddress address, final SessionListenerFactory<L> listenerFactory,
+                       final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolMessageFactory<M> messageFactory) {
                final ServerBootstrap b = new ServerBootstrap();
                b.group(this.bossGroup, this.workerGroup);
                b.channel(NioServerSocketChannel.class);
                b.option(ChannelOption.SO_BACKLOG, 128);
-               b.childHandler(new ServerChannelInitializer(server));
+               b.childHandler(new ChannelInitializerImpl<M, S, L>(negotiatorFactory,
+                               listenerFactory, new ProtocolHandlerFactory<M>(messageFactory), new DefaultPromise<S>(GlobalEventExecutor.INSTANCE)));
                b.childOption(ChannelOption.SO_KEEPALIVE, true);
 
                // Bind and start to accept incoming connections.
                final ChannelFuture f = b.bind(address);
-               final Promise<ProtocolServer> p = new DefaultPromise<ProtocolServer>() {
-                       @Override
-                       public boolean cancel(final boolean mayInterruptIfRunning) {
-                               if (super.cancel(mayInterruptIfRunning)) {
-                                       f.cancel(mayInterruptIfRunning);
-                                       return true;
-                               }
-
-                               return false;
-                       }
-               };
+               logger.debug("Initiated server {} at {}.", f, address);
+               return f;
 
-               f.addListener(new ChannelFutureListener() {
-                       @Override
-                       public void operationComplete(final ChannelFuture cf) {
-                               // User cancelled, we need to make sure the server is closed
-                               if (p.isCancelled() && cf.isSuccess()) {
-                                       cf.channel().close();
-                                       return;
-                               }
-
-                               if (cf.isSuccess()) {
-                                       p.setSuccess(server);
-                                       synchronized (DispatcherImpl.this.serverSessions) {
-                                               DispatcherImpl.this.serverSessions.put(server, cf.channel());
-                                       }
-                               } else {
-                                       p.setFailure(cf.cause());
-                               }
-                       }
-               });
-
-               logger.debug("Created server {}.", server);
-               return p;
        }
 
-       @ThreadSafe
-       private final class ProtocolSessionPromise<T extends ProtocolSession> extends DefaultPromise<T> {
-               private final ClientChannelInitializer<T> init;
-               private final ProtocolConnection connection;
-               private final ReconnectStrategy strategy;
-               private final Bootstrap b;
-
-               @GuardedBy("this")
-               private Future<?> pending;
-
-               ProtocolSessionPromise(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory, final ReconnectStrategy strategy) {
-                       this.connection = Preconditions.checkNotNull(connection);
-                       this.strategy = Preconditions.checkNotNull(strategy);
-
-                       init = new ClientChannelInitializer<T>(connection, sfactory);
-                       b = new Bootstrap();
-                       b.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(init);
-               }
-
-               private synchronized void connect() {
-                       final Object lock = this;
-
-                       try {
-                               final int timeout = strategy.getConnectTimeout();
-                               b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
-                               pending = b.connect(connection.getPeerAddress()).addListener(new ChannelFutureListener() {
-                                       @Override
-                                       public void operationComplete(final ChannelFuture cf) throws Exception {
-                                               synchronized (lock) {
-                                                       // Triggered when a connection attempt is resolved.
-                                                       Preconditions.checkState(pending == cf);
-
-                                                       /*
-                                                        * The promise we gave out could have been cancelled,
-                                                        * which cascades to the connect getting cancelled,
-                                                        * but there is a slight race window, where the connect
-                                                        * is already resolved, but the listener has not yet
-                                                        * been notified -- cancellation at that point won't
-                                                        * stop the notification arriving, so we have to close
-                                                        * the race here.
-                                                        */
-                                                       if (isCancelled()) {
-                                                               if (cf.isSuccess()) {
-                                                                       cf.channel().close();
-                                                               }
-                                                               return;
-                                                       }
-
-                                                       // FIXME: check cancellation
-
-                                                       if (cf.isSuccess()) {
-                                                               final T s = init.getSession();
-                                                               setSuccess(s);
-                                                               strategy.reconnectSuccessful();
-                                                               synchronized (DispatcherImpl.this.clientSessions) {
-                                                                       DispatcherImpl.this.clientSessions.put(s, cf.channel());
-                                                               }
-                                                       } else {
-                                                               final Future<Void> rf = strategy.scheduleReconnect();
-                                                               rf.addListener(new FutureListener<Void>() {
-                                                                       @Override
-                                                                       public void operationComplete(final Future<Void> sf) {
-                                                                               synchronized (lock) {
-                                                                                       // Triggered when a connection attempt is to be made.
-                                                                                       Preconditions.checkState(pending == sf);
-
-                                                                                       /*
-                                                                                        * The promise we gave out could have been cancelled,
-                                                                                        * which cascades to the reconnect attempt getting
-                                                                                        * cancelled, but there is a slight race window, where
-                                                                                        * the reconnect attempt is already enqueued, but the
-                                                                                        * listener has not yet been notified -- if cancellation
-                                                                                        * happens at that point, we need to catch it here.
-                                                                                        */
-                                                                                       if (!isCancelled()) {
-                                                                                               if (sf.isSuccess()) {
-                                                                                                       connect();
-                                                                                               } else {
-                                                                                                       setFailure(sf.cause());
-                                                                                               }
-                                                                                       }
-                                                                               }
-                                                                       }
-                                                               });
+       @Override
+       public <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> Future<S> createClient(
+                       final InetSocketAddress address, final L listener, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
+                       final ProtocolMessageFactory<M> messageFactory, final ReconnectStrategy strategy) {
+               final ProtocolSessionPromise<M, S, L> p = new ProtocolSessionPromise<M, S, L>(workerGroup, address, negotiatorFactory,
+                               new SessionListenerFactory<L>() {
+                       private boolean created = false;
 
-                                                               pending = rf;
-                                                       }
-                                               }
-                                       }
-                               });
-                       } catch (Exception e) {
-                               setFailure(e);
+                       @Override
+                       public synchronized L getSessionListener() {
+                               Preconditions.checkState(created == false);
+                               created = true;
+                               return listener;
                        }
-               }
 
-               @Override
-               public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
-                       if (super.cancel(mayInterruptIfRunning)) {
-                               pending.cancel(mayInterruptIfRunning);
-                               return true;
-                       }
+               }, new ProtocolHandlerFactory<M>(messageFactory), strategy);
 
-                       return false;
-               }
+               p.connect();
+               logger.debug("Client created.");
+               return p;
        }
 
        @Override
-       public <T extends ProtocolSession> Future<T> createClient(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory, final ReconnectStrategy strategy) {
-               final ProtocolSessionPromise<T> p = new ProtocolSessionPromise<>(connection, sfactory, strategy);
+       public <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> Future<Void> createReconnectingClient(
+                       final InetSocketAddress address, final L listener, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
+                       final ProtocolMessageFactory<M> messageFactory, final ReconnectStrategyFactory connectStrategyFactory,
+                       final ReconnectStrategy reestablishStrategy) {
+
+               final ReconnectPromise<M, S, L> p = new ReconnectPromise<M, S, L>(this, address, listener, negotiatorFactory,
+                               messageFactory, connectStrategyFactory, reestablishStrategy);
+
                p.connect();
 
-               logger.debug("Client created.");
                return p;
-       }
 
-       @Override
-       public void close() throws IOException {
-               this.workerGroup.shutdownGracefully();
-               this.bossGroup.shutdownGracefully();
        }
 
        @Override
-       public void onSessionClosed(final ProtocolSession session) {
-               synchronized (this.clientSessions) {
-                       logger.trace("Removing client session: {}", session);
-                       final Channel ch = this.clientSessions.get(session);
-                       ch.close();
-                       this.clientSessions.remove(session);
-                       logger.debug("Removed client session: {}", session.toString());
-               }
-       }
-
-       void onServerClosed(final ProtocolServer server) {
-               synchronized (this.serverSessions) {
-                       logger.trace("Removing server session: {}", server);
-                       final Channel ch = this.serverSessions.get(server);
-                       ch.close();
-                       this.clientSessions.remove(server);
-                       logger.debug("Removed server session: {}", server.toString());
+       public void close() {
+               try {
+                       this.workerGroup.shutdownGracefully();
+               } finally {
+                       this.bossGroup.shutdownGracefully();
                }
        }
 }