Add custom EXI buffer management
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / AbstractNetconfDispatcher.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.nettyutil;
9
10 import com.google.common.base.Preconditions;
11 import io.netty.bootstrap.Bootstrap;
12 import io.netty.bootstrap.ServerBootstrap;
13 import io.netty.buffer.PooledByteBufAllocator;
14 import io.netty.channel.Channel;
15 import io.netty.channel.ChannelFuture;
16 import io.netty.channel.ChannelInitializer;
17 import io.netty.channel.ChannelOption;
18 import io.netty.channel.EventLoopGroup;
19 import io.netty.channel.ServerChannel;
20 import io.netty.channel.local.LocalServerChannel;
21 import io.netty.channel.socket.SocketChannel;
22 import io.netty.channel.socket.nio.NioServerSocketChannel;
23 import io.netty.channel.socket.nio.NioSocketChannel;
24 import io.netty.util.concurrent.DefaultPromise;
25 import io.netty.util.concurrent.EventExecutor;
26 import io.netty.util.concurrent.Future;
27 import io.netty.util.concurrent.GlobalEventExecutor;
28 import io.netty.util.concurrent.Promise;
29 import java.io.Closeable;
30 import java.net.InetSocketAddress;
31 import java.net.SocketAddress;
32 import org.opendaylight.netconf.api.NetconfSession;
33 import org.opendaylight.netconf.api.NetconfSessionListener;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
39  * start method that will handle sockets in different thread.
40  */
41 @Deprecated
42 public abstract class AbstractNetconfDispatcher<S extends NetconfSession, L extends NetconfSessionListener<? super S>>
43         implements Closeable {
44
45     protected interface ChannelPipelineInitializer<C extends Channel, S extends NetconfSession> {
46         /**
47          * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore
48          * this method needs to be implemented in protocol specific Dispatchers.
49          *
50          * @param channel whose pipeline should be defined, also to be passed to {@link NetconfSessionNegotiatorFactory}
51          * @param promise to be passed to {@link NetconfSessionNegotiatorFactory}
52          */
53         void initializeChannel(C channel, Promise<S> promise);
54     }
55
56     protected interface PipelineInitializer<S extends NetconfSession>
57         extends ChannelPipelineInitializer<SocketChannel, S> {
58
59     }
60
61
62     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfDispatcher.class);
63
64     private final EventLoopGroup bossGroup;
65
66     private final EventLoopGroup workerGroup;
67
68     private final EventExecutor executor;
69
70     protected AbstractNetconfDispatcher(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
71         this(GlobalEventExecutor.INSTANCE, bossGroup, workerGroup);
72     }
73
74     protected AbstractNetconfDispatcher(final EventExecutor executor, final EventLoopGroup bossGroup,
75             final EventLoopGroup workerGroup) {
76         this.bossGroup = Preconditions.checkNotNull(bossGroup);
77         this.workerGroup = Preconditions.checkNotNull(workerGroup);
78         this.executor = Preconditions.checkNotNull(executor);
79     }
80
81
82     /**
83      * Creates server. Each server needs factories to pass their instances to client sessions.
84      *
85      * @param address address to which the server should be bound
86      * @param initializer instance of PipelineInitializer used to initialize the channel pipeline
87      *
88      * @return ChannelFuture representing the binding process
89      */
90     protected ChannelFuture createServer(final InetSocketAddress address, final PipelineInitializer<S> initializer) {
91         return createServer(address, NioServerSocketChannel.class, initializer);
92     }
93
94     /**
95      * Creates server. Each server needs factories to pass their instances to client sessions.
96      *
97      * @param address address to which the server should be bound
98      * @param channelClass The {@link Class} which is used to create {@link Channel} instances from.
99      * @param initializer instance of PipelineInitializer used to initialize the channel pipeline
100      *
101      * @return ChannelFuture representing the binding process
102      */
103     protected <C extends Channel> ChannelFuture createServer(final SocketAddress address,
104             final Class<? extends ServerChannel> channelClass, final ChannelPipelineInitializer<C, S> initializer) {
105         final ServerBootstrap b = new ServerBootstrap();
106         b.childHandler(new ChannelInitializer<C>() {
107
108             @Override
109             protected void initChannel(final C ch) {
110                 initializer.initializeChannel(ch, new DefaultPromise<>(executor));
111             }
112         });
113
114         b.option(ChannelOption.SO_BACKLOG, 128);
115         if (LocalServerChannel.class.equals(channelClass) == false) {
116             // makes no sense for LocalServer and produces warning
117             b.childOption(ChannelOption.SO_KEEPALIVE, true);
118             b.childOption(ChannelOption.TCP_NODELAY , true);
119         }
120         b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
121         customizeBootstrap(b);
122
123         if (b.group() == null) {
124             b.group(bossGroup, workerGroup);
125         }
126         try {
127             b.channel(channelClass);
128         } catch (final IllegalStateException e) {
129             // FIXME: if this is ok, document why
130             LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
131         }
132
133         // Bind and start to accept incoming connections.
134         final ChannelFuture f = b.bind(address);
135         LOG.debug("Initiated server {} at {}.", f, address);
136         return f;
137     }
138
139     /**
140      * Customize a server bootstrap before the server is created. This allows
141      * subclasses to assign non-default server options before the server is
142      * created.
143      *
144      * @param bootstrap Server bootstrap
145      */
146     protected void customizeBootstrap(final ServerBootstrap bootstrap) {
147         // The default is a no-op
148     }
149
150     /**
151      * Customize a client bootstrap before the connection is attempted. This
152      * allows subclasses to assign non-default options before the client is
153      * created.
154      *
155      * @param bootstrap Client bootstrap
156      */
157     protected void customizeBootstrap(final Bootstrap bootstrap) {
158         // The default is a no-op
159     }
160
161     /**
162      * Creates a client.
163      *
164      * @param address remote address
165      * @param strategy Reconnection strategy to be used when initial connection fails
166      *
167      * @return Future representing the connection process. Its result represents the combined success of TCP connection
168      *         as well as session negotiation.
169      */
170     protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy,
171             final PipelineInitializer<S> initializer) {
172         final Bootstrap b = new Bootstrap();
173         final NetconfSessionPromise<S> p = new NetconfSessionPromise<>(executor, address, strategy, b);
174         b.option(ChannelOption.SO_KEEPALIVE, true).handler(
175                 new ChannelInitializer<SocketChannel>() {
176                     @Override
177                     protected void initChannel(final SocketChannel ch) {
178                         initializer.initializeChannel(ch, p);
179                     }
180                 });
181
182         customizeBootstrap(b);
183         setWorkerGroup(b);
184         setChannelFactory(b);
185
186         p.connect();
187         LOG.debug("Client created.");
188         return p;
189     }
190
191     /**
192      * Create a client but use a pre-configured bootstrap.
193      * This method however replaces the ChannelInitializer in the bootstrap. All other configuration is preserved.
194      *
195      * @param address remote address
196      */
197     protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy,
198             final Bootstrap bootstrap, final PipelineInitializer<S> initializer) {
199         final NetconfSessionPromise<S> p = new NetconfSessionPromise<>(executor, address, strategy, bootstrap);
200
201         bootstrap.handler(
202                 new ChannelInitializer<SocketChannel>() {
203                     @Override
204                     protected void initChannel(final SocketChannel ch) {
205                         initializer.initializeChannel(ch, p);
206                     }
207                 });
208
209         p.connect();
210         LOG.debug("Client created.");
211         return p;
212     }
213
214     /**
215      * Creates a client.
216      *
217      * @param address remote address
218      * @param connectStrategyFactory Factory for creating reconnection strategy to be used when initial connection fails
219      * @param reestablishStrategy Reconnection strategy to be used when the already-established session fails
220      * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
221      *         success if it indicates no further attempts should be made and failure if it reports an error
222      * @deprecated Use
223      *             {@link #createReconnectingClient(InetSocketAddress, ReconnectStrategyFactory, PipelineInitializer)}
224      *             instead.
225      */
226     @Deprecated
227     protected Future<Void> createReconnectingClient(final InetSocketAddress address,
228             final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
229             final PipelineInitializer<S> initializer) {
230         return createReconnectingClient(address, connectStrategyFactory, initializer);
231     }
232
233     /**
234      * Creates a reconnecting client.
235      *
236      * @param address remote address
237      * @param connectStrategyFactory Factory for creating reconnection strategy for every reconnect attempt
238      * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
239      *         success is never reported, only failure when it runs out of reconnection attempts.
240      */
241     protected Future<Void> createReconnectingClient(final InetSocketAddress address,
242             final ReconnectStrategyFactory connectStrategyFactory, final PipelineInitializer<S> initializer) {
243         final Bootstrap b = new Bootstrap();
244
245         final ReconnectPromise<S, L> p = new ReconnectPromise<>(GlobalEventExecutor.INSTANCE, this, address,
246                 connectStrategyFactory, b, initializer);
247
248         b.option(ChannelOption.SO_KEEPALIVE, true);
249
250         customizeBootstrap(b);
251         setWorkerGroup(b);
252         setChannelFactory(b);
253
254         p.connect();
255         return p;
256     }
257
258     private static void setChannelFactory(final Bootstrap bootstrap) {
259         // There is no way to detect if this was already set by
260         // customizeBootstrap()
261         try {
262             bootstrap.channel(NioSocketChannel.class);
263         } catch (final IllegalStateException e) {
264             LOG.trace("Not overriding channelFactory on bootstrap {}", bootstrap, e);
265         }
266     }
267
268     private void setWorkerGroup(final Bootstrap bootstrap) {
269         if (bootstrap.group() == null) {
270             bootstrap.group(workerGroup);
271         }
272     }
273
274     @Deprecated
275     @Override
276     public void close() {
277     }
278 }