e513dd7566da2ecc8aa9120250df20e957ee21e5
[bgpcep.git] / framework / src / main / java / org / opendaylight / protocol / framework / DispatcherImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.framework;
9
10 import io.netty.bootstrap.Bootstrap;
11 import io.netty.bootstrap.ServerBootstrap;
12 import io.netty.channel.Channel;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.ChannelFutureListener;
15 import io.netty.channel.ChannelInitializer;
16 import io.netty.channel.ChannelOption;
17 import io.netty.channel.EventLoopGroup;
18 import io.netty.channel.nio.NioEventLoopGroup;
19 import io.netty.channel.socket.SocketChannel;
20 import io.netty.channel.socket.nio.NioServerSocketChannel;
21 import io.netty.channel.socket.nio.NioSocketChannel;
22 import io.netty.util.concurrent.DefaultPromise;
23 import io.netty.util.concurrent.Future;
24 import io.netty.util.concurrent.FutureListener;
25 import io.netty.util.concurrent.Promise;
26
27 import java.io.IOException;
28 import java.net.InetSocketAddress;
29 import java.util.Map;
30 import java.util.Timer;
31
32 import javax.annotation.concurrent.GuardedBy;
33 import javax.annotation.concurrent.ThreadSafe;
34
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import com.google.common.base.Preconditions;
39 import com.google.common.collect.Maps;
40
41 /**
42  * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
43  * start method that will handle sockets in different thread.
44  */
45 public final class DispatcherImpl implements Dispatcher, SessionParent {
46
47         final class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
48
49                 private final ProtocolServer server;
50
51                 private ProtocolSession session;
52
53                 public ServerChannelInitializer(final ProtocolServer server) {
54                         this.server = server;
55                 }
56
57                 @Override
58                 protected void initChannel(final SocketChannel ch) throws Exception {
59                         final ProtocolHandlerFactory factory = new ProtocolHandlerFactory(DispatcherImpl.this.messageFactory);
60                         ch.pipeline().addFirst("decoder", factory.getDecoder());
61                         this.session = this.server.createSession(DispatcherImpl.this.stateTimer, ch);
62
63                         ch.pipeline().addAfter("decoder", "inbound", factory.getSessionInboundHandler(this.session));
64                         ch.pipeline().addAfter("inbound", "encoder", factory.getEncoder());
65                 }
66
67                 public ProtocolSession getSession() {
68                         return this.session;
69                 }
70
71         }
72
73         final class ClientChannelInitializer<T extends ProtocolSession> extends ChannelInitializer<SocketChannel> {
74
75                 private final ProtocolSessionFactory<T> sfactory;
76
77                 private final ProtocolConnection connection;
78
79                 private T session;
80
81                 public ClientChannelInitializer(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory) {
82                         this.connection = connection;
83                         this.sfactory = sfactory;
84                 }
85
86                 @Override
87                 protected void initChannel(final SocketChannel ch) throws Exception {
88                         final ProtocolHandlerFactory factory = new ProtocolHandlerFactory(DispatcherImpl.this.messageFactory);
89                         ch.pipeline().addFirst("decoder", factory.getDecoder());
90                         this.session = this.sfactory.getProtocolSession(DispatcherImpl.this, DispatcherImpl.this.stateTimer, this.connection, 0, ch);
91                         ch.pipeline().addAfter("decoder", "inbound", factory.getSessionInboundHandler(this.session));
92                         ch.pipeline().addAfter("inbound", "encoder", factory.getEncoder());
93                 }
94
95                 T getSession() {
96                         return this.session;
97                 }
98         }
99
100         private static final Logger logger = LoggerFactory.getLogger(DispatcherImpl.class);
101
102         private final EventLoopGroup bossGroup;
103
104         private final EventLoopGroup workerGroup;
105
106         /**
107          * Timer object grouping FSM Timers
108          */
109         private final Timer stateTimer;
110
111         private final ProtocolMessageFactory messageFactory;
112
113         private final Map<ProtocolServer, Channel> serverSessions;
114
115         private final Map<ProtocolSession, Channel> clientSessions;
116
117         public DispatcherImpl(final ProtocolMessageFactory factory) {
118                 this.bossGroup = new NioEventLoopGroup();
119                 this.workerGroup = new NioEventLoopGroup();
120                 this.stateTimer = new Timer();
121                 this.messageFactory = factory;
122                 this.clientSessions = Maps.newHashMap();
123                 this.serverSessions = Maps.newHashMap();
124         }
125
126         @Override
127         public Future<ProtocolServer> createServer(final InetSocketAddress address, final ProtocolConnectionFactory connectionFactory,
128                         final ProtocolSessionFactory<?> sessionFactory) {
129                 final ProtocolServer server = new ProtocolServer(address, connectionFactory, sessionFactory, this);
130                 final ServerBootstrap b = new ServerBootstrap();
131                 b.group(this.bossGroup, this.workerGroup);
132                 b.channel(NioServerSocketChannel.class);
133                 b.option(ChannelOption.SO_BACKLOG, 128);
134                 b.childHandler(new ServerChannelInitializer(server));
135                 b.childOption(ChannelOption.SO_KEEPALIVE, true);
136
137                 // Bind and start to accept incoming connections.
138                 final ChannelFuture f = b.bind(address);
139                 final Promise<ProtocolServer> p = new DefaultPromise<ProtocolServer>() {
140                         @Override
141                         public boolean cancel(final boolean mayInterruptIfRunning) {
142                                 if (super.cancel(mayInterruptIfRunning)) {
143                                         f.cancel(mayInterruptIfRunning);
144                                         return true;
145                                 }
146
147                                 return false;
148                         }
149                 };
150
151                 f.addListener(new ChannelFutureListener() {
152                         @Override
153                         public void operationComplete(final ChannelFuture cf) {
154                                 // User cancelled, we need to make sure the server is closed
155                                 if (p.isCancelled() && cf.isSuccess()) {
156                                         cf.channel().close();
157                                         return;
158                                 }
159
160                                 if (cf.isSuccess()) {
161                                         p.setSuccess(server);
162                                         synchronized (DispatcherImpl.this.serverSessions) {
163                                                 DispatcherImpl.this.serverSessions.put(server, cf.channel());
164                                         }
165                                 } else {
166                                         p.setFailure(cf.cause());
167                                 }
168                         }
169                 });
170
171                 logger.debug("Created server {}.", server);
172                 return p;
173         }
174
175         @ThreadSafe
176         private final class ProtocolSessionPromise<T extends ProtocolSession> extends DefaultPromise<T> {
177                 private final ClientChannelInitializer<T> init;
178                 private final ProtocolConnection connection;
179                 private final ReconnectStrategy strategy;
180                 private final Bootstrap b;
181
182                 @GuardedBy("this")
183                 private Future<?> pending;
184
185                 ProtocolSessionPromise(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory, final ReconnectStrategy strategy) {
186                         this.connection = Preconditions.checkNotNull(connection);
187                         this.strategy = Preconditions.checkNotNull(strategy);
188
189                         init = new ClientChannelInitializer<T>(connection, sfactory);
190                         b = new Bootstrap();
191                         b.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(init);
192                 }
193
194                 private synchronized void connect() {
195                         final Object lock = this;
196
197                         try {
198                                 final int timeout = strategy.getConnectTimeout();
199                                 b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
200                                 pending = b.connect(connection.getPeerAddress()).addListener(new ChannelFutureListener() {
201                                         @Override
202                                         public void operationComplete(final ChannelFuture cf) throws Exception {
203                                                 synchronized (lock) {
204                                                         // Triggered when a connection attempt is resolved.
205                                                         Preconditions.checkState(pending == cf);
206
207                                                         /*
208                                                          * The promise we gave out could have been cancelled,
209                                                          * which cascades to the connect getting cancelled,
210                                                          * but there is a slight race window, where the connect
211                                                          * is already resolved, but the listener has not yet
212                                                          * been notified -- cancellation at that point won't
213                                                          * stop the notification arriving, so we have to close
214                                                          * the race here.
215                                                          */
216                                                         if (isCancelled()) {
217                                                                 if (cf.isSuccess()) {
218                                                                         cf.channel().close();
219                                                                 }
220                                                                 return;
221                                                         }
222
223                                                         // FIXME: check cancellation
224
225                                                         if (cf.isSuccess()) {
226                                                                 final T s = init.getSession();
227                                                                 setSuccess(s);
228                                                                 strategy.reconnectSuccessful();
229                                                                 synchronized (DispatcherImpl.this.clientSessions) {
230                                                                         DispatcherImpl.this.clientSessions.put(s, cf.channel());
231                                                                 }
232                                                         } else {
233                                                                 final Future<Void> rf = strategy.scheduleReconnect();
234                                                                 rf.addListener(new FutureListener<Void>() {
235                                                                         @Override
236                                                                         public void operationComplete(final Future<Void> sf) {
237                                                                                 synchronized (lock) {
238                                                                                         // Triggered when a connection attempt is to be made.
239                                                                                         Preconditions.checkState(pending == sf);
240
241                                                                                         /*
242                                                                                          * The promise we gave out could have been cancelled,
243                                                                                          * which cascades to the reconnect attempt getting
244                                                                                          * cancelled, but there is a slight race window, where
245                                                                                          * the reconnect attempt is already enqueued, but the
246                                                                                          * listener has not yet been notified -- if cancellation
247                                                                                          * happens at that point, we need to catch it here.
248                                                                                          */
249                                                                                         if (!isCancelled()) {
250                                                                                                 if (sf.isSuccess()) {
251                                                                                                         connect();
252                                                                                                 } else {
253                                                                                                         setFailure(sf.cause());
254                                                                                                 }
255                                                                                         }
256                                                                                 }
257                                                                         }
258                                                                 });
259
260                                                                 pending = rf;
261                                                         }
262                                                 }
263                                         }
264                                 });
265                         } catch (Exception e) {
266                                 setFailure(e);
267                         }
268                 }
269
270                 @Override
271                 public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
272                         if (super.cancel(mayInterruptIfRunning)) {
273                                 pending.cancel(mayInterruptIfRunning);
274                                 return true;
275                         }
276
277                         return false;
278                 }
279         }
280
281         @Override
282         public <T extends ProtocolSession> Future<T> createClient(final ProtocolConnection connection, final ProtocolSessionFactory<T> sfactory, final ReconnectStrategy strategy) {
283                 final ProtocolSessionPromise<T> p = new ProtocolSessionPromise<>(connection, sfactory, strategy);
284                 p.connect();
285
286                 logger.debug("Client created.");
287                 return p;
288         }
289
290         @Override
291         public void close() throws IOException {
292                 this.workerGroup.shutdownGracefully();
293                 this.bossGroup.shutdownGracefully();
294         }
295
296         @Override
297         public void onSessionClosed(final ProtocolSession session) {
298                 synchronized (this.clientSessions) {
299                         logger.trace("Removing client session: {}", session);
300                         final Channel ch = this.clientSessions.get(session);
301                         ch.close();
302                         this.clientSessions.remove(session);
303                         logger.debug("Removed client session: {}", session.toString());
304                 }
305         }
306
307         void onServerClosed(final ProtocolServer server) {
308                 synchronized (this.serverSessions) {
309                         logger.trace("Removing server session: {}", server);
310                         final Channel ch = this.serverSessions.get(server);
311                         ch.close();
312                         this.clientSessions.remove(server);
313                         logger.debug("Removed server session: {}", server.toString());
314                 }
315         }
316 }