*/
package org.opendaylight.protocol.framework;
-import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.GlobalEventExecutor;
-import java.io.IOException;
+import java.io.Closeable;
import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Timer;
-
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
/**
* Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
* start method that will handle sockets in different thread.
*/
-public final class DispatcherImpl implements Dispatcher, SessionParent {
-
- final class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- private final ProtocolServer server;
-
- private ProtocolSession session;
-
- public ServerChannelInitializer(final ProtocolServer server) {
- this.server = server;
- }
-
- @Override
- protected void initChannel(final SocketChannel ch) throws Exception {
- final ProtocolHandlerFactory factory = new ProtocolHandlerFactory(DispatcherImpl.this.messageFactory);
- ch.pipeline().addFirst("decoder", factory.getDecoder());
- this.session = this.server.createSession(DispatcherImpl.this.stateTimer, ch);
-
- ch.pipeline().addAfter("decoder", "inbound", factory.getSessionInboundHandler(this.session));
- ch.pipeline().addAfter("inbound", "encoder", factory.getEncoder());
- }
-
- public ProtocolSession getSession() {
- return this.session;
- }
-
- }
-
- final class ClientChannelInitializer<T extends ProtocolSession> extends ChannelInitializer<SocketChannel> {
-
- private final ProtocolSessionFactory<T> sfactory;
-
- private final ProtocolConnection connection;
-
- private T session;
-
- public ClientChannelInitializer(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory) {
- this.connection = connection;
- this.sfactory = sfactory;
- }
-
- @Override
- protected void initChannel(final SocketChannel ch) throws Exception {
- final ProtocolHandlerFactory factory = new ProtocolHandlerFactory(DispatcherImpl.this.messageFactory);
- ch.pipeline().addFirst("decoder", factory.getDecoder());
- this.session = this.sfactory.getProtocolSession(DispatcherImpl.this, DispatcherImpl.this.stateTimer, this.connection, 0, ch);
- ch.pipeline().addAfter("decoder", "inbound", factory.getSessionInboundHandler(this.session));
- ch.pipeline().addAfter("inbound", "encoder", factory.getEncoder());
- }
-
- T getSession() {
- return this.session;
- }
- }
+public final class DispatcherImpl implements Closeable, Dispatcher {
private static final Logger logger = LoggerFactory.getLogger(DispatcherImpl.class);
private final EventLoopGroup workerGroup;
- /**
- * Timer object grouping FSM Timers
- */
- private final Timer stateTimer;
-
- private final ProtocolMessageFactory messageFactory;
-
- private final Map<ProtocolServer, Channel> serverSessions;
-
- private final Map<ProtocolSession, Channel> clientSessions;
-
- public DispatcherImpl(final ProtocolMessageFactory factory) {
+ public DispatcherImpl() {
+ // FIXME: we should get these as arguments
this.bossGroup = new NioEventLoopGroup();
this.workerGroup = new NioEventLoopGroup();
- this.stateTimer = new Timer();
- this.messageFactory = factory;
- this.clientSessions = Maps.newHashMap();
- this.serverSessions = Maps.newHashMap();
}
@Override
- public Future<ProtocolServer> createServer(final InetSocketAddress address, final ProtocolConnectionFactory connectionFactory,
- final ProtocolSessionFactory<?> sessionFactory) {
- final ProtocolServer server = new ProtocolServer(address, connectionFactory, sessionFactory, this);
+ public <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> ChannelFuture createServer(
+ final InetSocketAddress address, final SessionListenerFactory<L> listenerFactory,
+ final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolMessageFactory<M> messageFactory) {
final ServerBootstrap b = new ServerBootstrap();
b.group(this.bossGroup, this.workerGroup);
b.channel(NioServerSocketChannel.class);
b.option(ChannelOption.SO_BACKLOG, 128);
- b.childHandler(new ServerChannelInitializer(server));
+ b.childHandler(new ChannelInitializerImpl<M, S, L>(negotiatorFactory,
+ listenerFactory, new ProtocolHandlerFactory<M>(messageFactory), new DefaultPromise<S>(GlobalEventExecutor.INSTANCE)));
b.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
final ChannelFuture f = b.bind(address);
- final Promise<ProtocolServer> p = new DefaultPromise<ProtocolServer>() {
- @Override
- public boolean cancel(final boolean mayInterruptIfRunning) {
- if (super.cancel(mayInterruptIfRunning)) {
- f.cancel(mayInterruptIfRunning);
- return true;
- }
-
- return false;
- }
- };
+ logger.debug("Initiated server {} at {}.", f, address);
+ return f;
- f.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(final ChannelFuture cf) {
- // User cancelled, we need to make sure the server is closed
- if (p.isCancelled() && cf.isSuccess()) {
- cf.channel().close();
- return;
- }
-
- if (cf.isSuccess()) {
- p.setSuccess(server);
- synchronized (DispatcherImpl.this.serverSessions) {
- DispatcherImpl.this.serverSessions.put(server, cf.channel());
- }
- } else {
- p.setFailure(cf.cause());
- }
- }
- });
-
- logger.debug("Created server {}.", server);
- return p;
}
- @ThreadSafe
- private final class ProtocolSessionPromise<T extends ProtocolSession> extends DefaultPromise<T> {
- private final ClientChannelInitializer<T> init;
- private final ProtocolConnection connection;
- private final ReconnectStrategy strategy;
- private final Bootstrap b;
-
- @GuardedBy("this")
- private Future<?> pending;
-
- ProtocolSessionPromise(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory, final ReconnectStrategy strategy) {
- this.connection = Preconditions.checkNotNull(connection);
- this.strategy = Preconditions.checkNotNull(strategy);
-
- init = new ClientChannelInitializer<T>(connection, sfactory);
- b = new Bootstrap();
- b.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(init);
- }
-
- private synchronized void connect() {
- final Object lock = this;
-
- try {
- final int timeout = strategy.getConnectTimeout();
- b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
- pending = b.connect(connection.getPeerAddress()).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(final ChannelFuture cf) throws Exception {
- synchronized (lock) {
- // Triggered when a connection attempt is resolved.
- Preconditions.checkState(pending == cf);
-
- /*
- * The promise we gave out could have been cancelled,
- * which cascades to the connect getting cancelled,
- * but there is a slight race window, where the connect
- * is already resolved, but the listener has not yet
- * been notified -- cancellation at that point won't
- * stop the notification arriving, so we have to close
- * the race here.
- */
- if (isCancelled()) {
- if (cf.isSuccess()) {
- cf.channel().close();
- }
- return;
- }
-
- // FIXME: check cancellation
-
- if (cf.isSuccess()) {
- final T s = init.getSession();
- setSuccess(s);
- strategy.reconnectSuccessful();
- synchronized (DispatcherImpl.this.clientSessions) {
- DispatcherImpl.this.clientSessions.put(s, cf.channel());
- }
- } else {
- final Future<Void> rf = strategy.scheduleReconnect();
- rf.addListener(new FutureListener<Void>() {
- @Override
- public void operationComplete(final Future<Void> sf) {
- synchronized (lock) {
- // Triggered when a connection attempt is to be made.
- Preconditions.checkState(pending == sf);
-
- /*
- * The promise we gave out could have been cancelled,
- * which cascades to the reconnect attempt getting
- * cancelled, but there is a slight race window, where
- * the reconnect attempt is already enqueued, but the
- * listener has not yet been notified -- if cancellation
- * happens at that point, we need to catch it here.
- */
- if (!isCancelled()) {
- if (sf.isSuccess()) {
- connect();
- } else {
- setFailure(sf.cause());
- }
- }
- }
- }
- });
+ @Override
+ public <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> Future<S> createClient(
+ final InetSocketAddress address, final L listener, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
+ final ProtocolMessageFactory<M> messageFactory, final ReconnectStrategy strategy) {
+ final ProtocolSessionPromise<M, S, L> p = new ProtocolSessionPromise<M, S, L>(workerGroup, address, negotiatorFactory,
+ new SessionListenerFactory<L>() {
+ private boolean created = false;
- pending = rf;
- }
- }
- }
- });
- } catch (Exception e) {
- setFailure(e);
+ @Override
+ public synchronized L getSessionListener() {
+ Preconditions.checkState(created == false);
+ created = true;
+ return listener;
}
- }
- @Override
- public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
- if (super.cancel(mayInterruptIfRunning)) {
- pending.cancel(mayInterruptIfRunning);
- return true;
- }
+ }, new ProtocolHandlerFactory<M>(messageFactory), strategy);
- return false;
- }
+ p.connect();
+ logger.debug("Client created.");
+ return p;
}
@Override
- public <T extends ProtocolSession> Future<T> createClient(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory, final ReconnectStrategy strategy) {
- final ProtocolSessionPromise<T> p = new ProtocolSessionPromise<>(connection, sfactory, strategy);
+ public <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> Future<Void> createReconnectingClient(
+ final InetSocketAddress address, final L listener, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
+ final ProtocolMessageFactory<M> messageFactory, final ReconnectStrategyFactory connectStrategyFactory,
+ final ReconnectStrategy reestablishStrategy) {
+
+ final ReconnectPromise<M, S, L> p = new ReconnectPromise<M, S, L>(this, address, listener, negotiatorFactory,
+ messageFactory, connectStrategyFactory, reestablishStrategy);
+
p.connect();
- logger.debug("Client created.");
return p;
- }
- @Override
- public void close() throws IOException {
- this.workerGroup.shutdownGracefully();
- this.bossGroup.shutdownGracefully();
}
@Override
- public void onSessionClosed(final ProtocolSession session) {
- synchronized (this.clientSessions) {
- logger.trace("Removing client session: {}", session);
- final Channel ch = this.clientSessions.get(session);
- ch.close();
- this.clientSessions.remove(session);
- logger.debug("Removed client session: {}", session.toString());
- }
- }
-
- void onServerClosed(final ProtocolServer server) {
- synchronized (this.serverSessions) {
- logger.trace("Removing server session: {}", server);
- final Channel ch = this.serverSessions.get(server);
- ch.close();
- this.clientSessions.remove(server);
- logger.debug("Removed server session: {}", server.toString());
+ public void close() {
+ try {
+ this.workerGroup.shutdownGracefully();
+ } finally {
+ this.bossGroup.shutdownGracefully();
}
}
}