import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
private LispSouthboundHandler lispSouthboundHandler;
private LispXtrSouthboundHandler lispXtrSouthboundHandler;
private NotificationPublishService notificationPublishService;
- private Channel channel;
+ private int numChannels = 1;
+ private Channel[] channel;
private Channel xtrChannel;
private Class channelType;
private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
this.notificationPublishService = notificationPublishService;
this.clusterSingletonService = clusterSingletonService;
this.clusterSingletonService.registerClusterSingletonService(this);
+ if (Epoll.isAvailable()) {
+ numChannels = 5;
+ }
+ channel = new Channel[numChannels];
}
public void init() {
lispXtrSouthboundHandler.setNotificationProvider(notificationPublishService);
if (Epoll.isAvailable()) {
- eventLoopGroup = new EpollEventLoopGroup(0, threadFactory);
+ eventLoopGroup = new EpollEventLoopGroup(numChannels, threadFactory);
channelType = EpollDatagramChannel.class;
+ bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
LOG.debug("Using Netty Epoll for UDP sockets");
} else {
eventLoopGroup = new NioEventLoopGroup(0, threadFactory);
@SuppressWarnings("checkstyle:IllegalCatch")
private void start() {
try {
- channel = bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
+ for (int i = 0; i < numChannels; ++i) {
+ channel[i] = bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
+ }
LOG.debug("Binding LISP UDP listening socket to {}:{}", bindingAddress, LispMessage.PORT_NUM);
} catch (Exception e) {
LOG.error("Failed to open main socket ", e);
@SuppressWarnings("checkstyle:IllegalCatch")
private void stop() {
try {
- channel.close().sync();
- channel = null;
+ for (int i = 0; i < numChannels; ++i) {
+ channel[i].close().sync();
+ channel[i] = null;
+ }
} catch (Exception e) {
LOG.error("Failed to close main socket ", e);
}
public void handleSerializedLispBuffer(InetAddress address, ByteBuffer outBuffer,
final MessageType packetType, final int portNumber, Channel senderChannel) {
if (senderChannel == null) {
- senderChannel = this.channel;
+ senderChannel = this.channel[0];
}
InetSocketAddress recipient = new InetSocketAddress(address, portNumber);
outBuffer.position(0);
import static org.junit.Assert.assertNull;
import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
Mockito.verify(handlerMock).close();
assertNull(getField("lispSouthboundHandler"));
assertNull(getField("lispXtrSouthboundHandler"));
- assertNull(getField("channel"));
+ Channel[] channel = getField("channel");
+ assertNull(channel[0]);
}
private static void injectChannel() throws NoSuchFieldException, IllegalAccessException {
final Field channelField = LispSouthboundPlugin.class.getDeclaredField("channel");
channelField.setAccessible(true);
- channelField.set(lispSouthboundPlugin, channel);
+ channelField.set(lispSouthboundPlugin, new Channel[] { channel });
}
private static void injectXtrChannel() throws NoSuchFieldException, IllegalAccessException {