2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.protocol.framework;
10 import io.netty.bootstrap.Bootstrap;
11 import io.netty.bootstrap.ServerBootstrap;
12 import io.netty.channel.Channel;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.ChannelFutureListener;
15 import io.netty.channel.ChannelInitializer;
16 import io.netty.channel.ChannelOption;
17 import io.netty.channel.EventLoopGroup;
18 import io.netty.channel.nio.NioEventLoopGroup;
19 import io.netty.channel.socket.SocketChannel;
20 import io.netty.channel.socket.nio.NioServerSocketChannel;
21 import io.netty.channel.socket.nio.NioSocketChannel;
22 import io.netty.util.concurrent.DefaultPromise;
23 import io.netty.util.concurrent.Future;
24 import io.netty.util.concurrent.FutureListener;
25 import io.netty.util.concurrent.Promise;
27 import java.io.IOException;
28 import java.net.InetSocketAddress;
30 import java.util.Timer;
32 import javax.annotation.concurrent.GuardedBy;
33 import javax.annotation.concurrent.ThreadSafe;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 import com.google.common.base.Preconditions;
39 import com.google.common.collect.Maps;
42 * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
43 * start method that will handle sockets in different thread.
45 public final class DispatcherImpl implements Dispatcher, SessionParent {
47 final class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
49 private final ProtocolServer server;
51 private ProtocolSession session;
53 public ServerChannelInitializer(final ProtocolServer server) {
58 protected void initChannel(final SocketChannel ch) throws Exception {
59 final ProtocolHandlerFactory factory = new ProtocolHandlerFactory(DispatcherImpl.this.messageFactory);
60 ch.pipeline().addFirst("decoder", factory.getDecoder());
61 this.session = this.server.createSession(DispatcherImpl.this.stateTimer, ch);
63 ch.pipeline().addAfter("decoder", "inbound", factory.getSessionInboundHandler(this.session));
64 ch.pipeline().addAfter("inbound", "encoder", factory.getEncoder());
67 public ProtocolSession getSession() {
73 final class ClientChannelInitializer<T extends ProtocolSession> extends ChannelInitializer<SocketChannel> {
75 private final ProtocolSessionFactory<T> sfactory;
77 private final ProtocolConnection connection;
81 public ClientChannelInitializer(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory) {
82 this.connection = connection;
83 this.sfactory = sfactory;
87 protected void initChannel(final SocketChannel ch) throws Exception {
88 final ProtocolHandlerFactory factory = new ProtocolHandlerFactory(DispatcherImpl.this.messageFactory);
89 ch.pipeline().addFirst("decoder", factory.getDecoder());
90 this.session = this.sfactory.getProtocolSession(DispatcherImpl.this, DispatcherImpl.this.stateTimer, this.connection, 0, ch);
91 ch.pipeline().addAfter("decoder", "inbound", factory.getSessionInboundHandler(this.session));
92 ch.pipeline().addAfter("inbound", "encoder", factory.getEncoder());
100 private static final Logger logger = LoggerFactory.getLogger(DispatcherImpl.class);
102 private final EventLoopGroup bossGroup;
104 private final EventLoopGroup workerGroup;
107 * Timer object grouping FSM Timers
109 private final Timer stateTimer;
111 private final ProtocolMessageFactory messageFactory;
113 private final Map<ProtocolServer, Channel> serverSessions;
115 private final Map<ProtocolSession, Channel> clientSessions;
117 public DispatcherImpl(final ProtocolMessageFactory factory) {
118 this.bossGroup = new NioEventLoopGroup();
119 this.workerGroup = new NioEventLoopGroup();
120 this.stateTimer = new Timer();
121 this.messageFactory = factory;
122 this.clientSessions = Maps.newHashMap();
123 this.serverSessions = Maps.newHashMap();
127 public Future<ProtocolServer> createServer(final InetSocketAddress address, final ProtocolConnectionFactory connectionFactory,
128 final ProtocolSessionFactory<?> sessionFactory) {
129 final ProtocolServer server = new ProtocolServer(address, connectionFactory, sessionFactory, this);
130 final ServerBootstrap b = new ServerBootstrap();
131 b.group(this.bossGroup, this.workerGroup);
132 b.channel(NioServerSocketChannel.class);
133 b.option(ChannelOption.SO_BACKLOG, 128);
134 b.childHandler(new ServerChannelInitializer(server));
135 b.childOption(ChannelOption.SO_KEEPALIVE, true);
137 // Bind and start to accept incoming connections.
138 final ChannelFuture f = b.bind(address);
139 final Promise<ProtocolServer> p = new DefaultPromise<ProtocolServer>() {
141 public boolean cancel(final boolean mayInterruptIfRunning) {
142 if (super.cancel(mayInterruptIfRunning)) {
143 f.cancel(mayInterruptIfRunning);
151 f.addListener(new ChannelFutureListener() {
153 public void operationComplete(final ChannelFuture cf) {
154 // User cancelled, we need to make sure the server is closed
155 if (p.isCancelled() && cf.isSuccess()) {
156 cf.channel().close();
160 if (cf.isSuccess()) {
161 p.setSuccess(server);
162 synchronized (DispatcherImpl.this.serverSessions) {
163 DispatcherImpl.this.serverSessions.put(server, cf.channel());
166 p.setFailure(cf.cause());
171 logger.debug("Created server {}.", server);
176 private final class ProtocolSessionPromise<T extends ProtocolSession> extends DefaultPromise<T> {
177 private final ClientChannelInitializer<T> init;
178 private final ProtocolConnection connection;
179 private final ReconnectStrategy strategy;
180 private final Bootstrap b;
183 private Future<?> pending;
185 ProtocolSessionPromise(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory, final ReconnectStrategy strategy) {
186 this.connection = Preconditions.checkNotNull(connection);
187 this.strategy = Preconditions.checkNotNull(strategy);
189 init = new ClientChannelInitializer<T>(connection, sfactory);
191 b.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(init);
194 private synchronized void connect() {
195 final Object lock = this;
198 final int timeout = strategy.getConnectTimeout();
199 b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
200 pending = b.connect(connection.getPeerAddress()).addListener(new ChannelFutureListener() {
202 public void operationComplete(final ChannelFuture cf) throws Exception {
203 synchronized (lock) {
204 // Triggered when a connection attempt is resolved.
205 Preconditions.checkState(pending == cf);
208 * The promise we gave out could have been cancelled,
209 * which cascades to the connect getting cancelled,
210 * but there is a slight race window, where the connect
211 * is already resolved, but the listener has not yet
212 * been notified -- cancellation at that point won't
213 * stop the notification arriving, so we have to close
217 if (cf.isSuccess()) {
218 cf.channel().close();
223 // FIXME: check cancellation
225 if (cf.isSuccess()) {
226 final T s = init.getSession();
228 strategy.reconnectSuccessful();
229 synchronized (DispatcherImpl.this.clientSessions) {
230 DispatcherImpl.this.clientSessions.put(s, cf.channel());
233 final Future<Void> rf = strategy.scheduleReconnect();
234 rf.addListener(new FutureListener<Void>() {
236 public void operationComplete(final Future<Void> sf) {
237 synchronized (lock) {
238 // Triggered when a connection attempt is to be made.
239 Preconditions.checkState(pending == sf);
242 * The promise we gave out could have been cancelled,
243 * which cascades to the reconnect attempt getting
244 * cancelled, but there is a slight race window, where
245 * the reconnect attempt is already enqueued, but the
246 * listener has not yet been notified -- if cancellation
247 * happens at that point, we need to catch it here.
249 if (!isCancelled()) {
250 if (sf.isSuccess()) {
253 setFailure(sf.cause());
265 } catch (Exception e) {
271 public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
272 if (super.cancel(mayInterruptIfRunning)) {
273 pending.cancel(mayInterruptIfRunning);
282 public <T extends ProtocolSession> Future<T> createClient(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory, final ReconnectStrategy strategy) {
283 final ProtocolSessionPromise<T> p = new ProtocolSessionPromise<>(connection, sfactory, strategy);
286 logger.debug("Client created.");
291 public void close() throws IOException {
292 this.workerGroup.shutdownGracefully();
293 this.bossGroup.shutdownGracefully();
297 public void onSessionClosed(final ProtocolSession session) {
298 synchronized (this.clientSessions) {
299 logger.trace("Removing client session: {}", session);
300 final Channel ch = this.clientSessions.get(session);
302 this.clientSessions.remove(session);
303 logger.debug("Removed client session: {}", session.toString());
307 void onServerClosed(final ProtocolServer server) {
308 synchronized (this.serverSessions) {
309 logger.trace("Removing server session: {}", server);
310 final Channel ch = this.serverSessions.get(server);
312 this.clientSessions.remove(server);
313 logger.debug("Removed server session: {}", server.toString());