Refactor ServerFacade
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / TcpServerFacade.java
1 /*
2  * Copyright (c) 2013 Pantheon Technologies, s.r.o. and others. All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.openflowjava.protocol.impl.core;
10
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import io.netty.bootstrap.Bootstrap;
16 import io.netty.bootstrap.ServerBootstrap;
17 import io.netty.buffer.PooledByteBufAllocator;
18 import io.netty.channel.ChannelFutureListener;
19 import io.netty.channel.ChannelOption;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.channel.WriteBufferWaterMark;
22 import io.netty.channel.epoll.Epoll;
23 import io.netty.channel.epoll.EpollEventLoopGroup;
24 import io.netty.channel.epoll.EpollServerSocketChannel;
25 import io.netty.channel.epoll.EpollSocketChannel;
26 import io.netty.channel.nio.NioEventLoopGroup;
27 import io.netty.channel.socket.nio.NioServerSocketChannel;
28 import io.netty.channel.socket.nio.NioSocketChannel;
29 import io.netty.handler.logging.LogLevel;
30 import io.netty.handler.logging.LoggingHandler;
31 import java.net.InetSocketAddress;
32 import org.checkerframework.checker.lock.qual.GuardedBy;
33 import org.eclipse.jdt.annotation.NonNull;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * Class implementing server over TCP / TLS for handling incoming connections.
40  *
41  * @author michal.polkorab
42  */
43 final class TcpServerFacade extends ServerFacade implements ConnectionInitializer {
44     private static final Logger LOG = LoggerFactory.getLogger(TcpServerFacade.class);
45
46     /*
47      * High/low write watermarks
48      */
49     private static final int DEFAULT_WRITE_HIGH_WATERMARK = 64 * 1024;
50     private static final int DEFAULT_WRITE_LOW_WATERMARK = 32 * 1024;
51     /*
52      * Write spin count. This tells Netty to immediately retry a non-blocking write this many times before moving on to
53      * selecting.
54      */
55     private static final int DEFAULT_WRITE_SPIN_COUNT = 16;
56
57     private final TcpChannelInitializer channelInitializer;
58     private final Bootstrap bootstrap;
59
60     @GuardedBy("this")
61     private EventLoopGroup childGroup;
62
63     private TcpServerFacade(final EventLoopGroup parentGroup, final EventLoopGroup childGroup,
64             final Bootstrap bootstrap, final TcpChannelInitializer channelInitializer,
65             final InetSocketAddress localAddress) {
66         super(parentGroup, localAddress);
67         this.childGroup = requireNonNull(childGroup);
68         this.bootstrap = requireNonNull(bootstrap);
69         this.channelInitializer = requireNonNull(channelInitializer);
70
71         // Log-and-hook to prevent surprise timing
72         LOG.info("Switch listener started and ready to accept incoming TCP/TLS connections on {}", localAddress);
73     }
74
75     static ListenableFuture<TcpServerFacade> start(final ConnectionConfiguration connConfig, final boolean epollEnabled,
76             final TcpChannelInitializer channelInitializer) {
77         // Server bootstrap configuration
78         final var serverBootstrap = new ServerBootstrap()
79             .handler(new LoggingHandler(LogLevel.DEBUG))
80             .childHandler(channelInitializer)
81             .option(ChannelOption.SO_BACKLOG, 128)
82             .option(ChannelOption.SO_REUSEADDR, true)
83             .childOption(ChannelOption.SO_KEEPALIVE, true)
84             .childOption(ChannelOption.TCP_NODELAY , true)
85             .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
86             .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
87                 new WriteBufferWaterMark(DEFAULT_WRITE_LOW_WATERMARK, DEFAULT_WRITE_HIGH_WATERMARK))
88             .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
89
90         // Client bootstrap configuration
91         final var bootstrap = new Bootstrap().handler(channelInitializer);
92
93         /*
94          * Initialize groups.
95          *
96          * We generally do not perform IO-unrelated tasks, so we want to have all outstanding tasks completed before
97          * the executing thread goes back into select.
98          *
99          * Any other setting means Netty will measure the time it spent selecting and spend roughly proportional time
100          * executing tasks.
101          */
102         final var threadConfig = connConfig.getThreadConfiguration();
103         final var childIoRatio = 100;
104
105         // Captured by bindFuture callback below
106         final EventLoopGroup parentGroup;
107         final EventLoopGroup childGroup;
108         if (Epoll.isAvailable() && epollEnabled) {
109             // Epoll
110             serverBootstrap.channel(EpollServerSocketChannel.class);
111             bootstrap.channel(EpollSocketChannel.class);
112
113             parentGroup = new EpollEventLoopGroup(threadConfig == null ? 0 : threadConfig.getBossThreadCount());
114             final var tmp = new EpollEventLoopGroup(threadConfig == null ? 0 : threadConfig.getWorkerThreadCount());
115             tmp.setIoRatio(childIoRatio);
116             childGroup = tmp;
117         } else {
118             // NIO
119             serverBootstrap.channel(NioServerSocketChannel.class);
120             bootstrap.channel(NioSocketChannel.class);
121
122             parentGroup = threadConfig == null ? new NioEventLoopGroup()
123                 : new NioEventLoopGroup(threadConfig.getBossThreadCount());
124
125             final var tmp = threadConfig == null ? new NioEventLoopGroup()
126                 : new NioEventLoopGroup(threadConfig.getWorkerThreadCount());
127             tmp.setIoRatio(childIoRatio);
128             childGroup = tmp;
129         }
130         serverBootstrap.group(parentGroup, childGroup);
131         bootstrap.group(childGroup);
132
133         // Attempt to bind the address
134         final var address = connConfig.getAddress();
135         final var port = connConfig.getPort();
136         final var bindFuture = address != null ? serverBootstrap.bind(address.getHostAddress(), port)
137             : serverBootstrap.bind(port);
138
139         // Clean up or hand off to caller
140         final var retFuture = SettableFuture.<TcpServerFacade>create();
141         bindFuture.addListener((ChannelFutureListener) future -> {
142             final var cause = future.cause();
143             if (cause != null) {
144                 childGroup.shutdownGracefully();
145                 parentGroup.shutdownGracefully();
146                 retFuture.setException(cause);
147                 return;
148             }
149
150             final var channel = future.channel();
151             final var handler = new TcpServerFacade(parentGroup, childGroup, bootstrap, channelInitializer,
152                 (InetSocketAddress) channel.localAddress());
153             // Hook onto the channel's termination to initiate group shutdown
154             channel.closeFuture().addListener(closeFuture -> handler.shutdown());
155             retFuture.set(handler);
156         });
157         return retFuture;
158     }
159
160     /**
161      * Returns the number of connected clients / channels.
162      *
163      * @return number of connected clients / channels
164      */
165     public int getNumberOfConnections() {
166         return channelInitializer.size();
167     }
168
169     @Override
170     public void initiateConnection(final String host, final int port) {
171         try {
172             bootstrap.connect(host, port).sync();
173         } catch (InterruptedException e) {
174             LOG.error("Unable to initiate connection", e);
175         }
176     }
177
178     @Override
179     synchronized @NonNull ListenableFuture<Void> shutdown() {
180         final var local = childGroup;
181         if (local != null) {
182             LOG.info("Cleaning up TCP/TLS connection resources on {}", localAddress());
183             childGroup = null;
184             local.shutdownGracefully();
185         }
186         return super.shutdown();
187     }
188 }