- @VisibleForTesting
- public synchronized Future<BGPSessionImpl> createClient(final InetSocketAddress localAddress,
- final InetSocketAddress remoteAddress, final int retryTimer, final boolean reuseAddress) {
- final Bootstrap clientBootStrap = createClientBootStrap(KeyMapping.getKeyMapping(), reuseAddress);
- clientBootStrap.localAddress(localAddress);
- return createClient(remoteAddress, retryTimer, clientBootStrap);
- }
-
- private synchronized Bootstrap createClientBootStrap(final KeyMapping keys, final boolean reuseAddress) {
- final Bootstrap bootstrap = new Bootstrap();
- if (Epoll.isAvailable()) {
- bootstrap.channel(EpollSocketChannel.class);
- bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
- } else {
- bootstrap.channel(NioSocketChannel.class);
- }
- if (keys != null && !keys.isEmpty()) {
- if (Epoll.isAvailable()) {
- bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys);
- } else {
- throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
- }
- }
-
- // Make sure we are doing round-robin processing
- bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(FIX_BUFFER_SIZE));
- bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
- bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK);
- bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
-
- if (bootstrap.config().group() == null) {
- bootstrap.group(this.workerGroup);
- }
-
- return bootstrap;
- }
-
- @Override
- public synchronized void close() throws InterruptedException {
- if (Epoll.isAvailable()) {
- LOG.debug("Closing Dispatcher");
- this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
- this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
- }
+ private synchronized Bootstrap createClientBootStrap(final KeyMapping keys, final boolean reuseAddress,
+ final InetSocketAddress localAddress) {
+ return nettyGroups.createBootstrap(keys)
+ // Make sure we are doing round-robin processing
+ .option(ChannelOption.RCVBUF_ALLOCATOR, RECV_ALLOCATOR)
+ .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
+ .option(ChannelOption.WRITE_BUFFER_WATER_MARK, WATER_MARK)
+ .option(ChannelOption.SO_REUSEADDR, reuseAddress)
+ .localAddress(localAddress);