From bb53240896a70377005d772ba440cf5bbf519f2b Mon Sep 17 00:00:00 2001 From: Lorand Jakab Date: Mon, 25 Jan 2016 19:35:29 +0200 Subject: [PATCH] Bug 5047: Improve soutbound performance This commit resulted in ~40% performance increase for the Map-Request/Map-Reply tests [1] on my local VM. [1] https://jenkins.opendaylight.org/releng/view/lispflowmapping/job/lispflowmapping-csit-1node-performance-only-beryllium/plot/#lispflowmapping-csit-1node-performance-only-beryllium-pps.csv-history.csv Change-Id: I70a8a893a022f958e2ecfaa93271b170473ff8ce Signed-off-by: Lorand Jakab --- mappingservice/southbound/pom.xml | 4 + .../southbound/LispSouthboundPlugin.java | 310 ++++++++---------- .../lisp/ILispSouthboundService.java | 2 +- ...ervice.java => LispSouthboundHandler.java} | 48 ++- ...ice.java => LispXtrSouthboundHandler.java} | 31 +- .../lisp/LispSouthboundServiceTest.java | 37 ++- 6 files changed, 215 insertions(+), 217 deletions(-) rename mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/{LispSouthboundService.java => LispSouthboundHandler.java} (87%) rename mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/{LispXtrSouthboundService.java => LispXtrSouthboundHandler.java} (82%) diff --git a/mappingservice/southbound/pom.xml b/mappingservice/southbound/pom.xml index b9a25d8e2..73e08bd55 100644 --- a/mappingservice/southbound/pom.xml +++ b/mappingservice/southbound/pom.xml @@ -21,6 +21,10 @@ ${project.groupId} mappingservice.lisp-proto + + io.netty + netty-all + junit-addons junit-addons diff --git a/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/LispSouthboundPlugin.java b/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/LispSouthboundPlugin.java index 699542a27..bdf4e5371 100644 --- a/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/LispSouthboundPlugin.java +++ b/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/LispSouthboundPlugin.java @@ -8,25 +8,30 @@ package org.opendaylight.lispflowmapping.southbound; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; +import static io.netty.buffer.Unpooled.wrappedBuffer; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramPacket; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.util.concurrent.DefaultThreadFactory; + import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketException; -import java.net.SocketTimeoutException; import java.nio.ByteBuffer; - -import org.apache.commons.lang3.exception.ExceptionUtils; +import java.util.concurrent.ThreadFactory; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext; import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.lispflowmapping.lisp.type.LispMessage; -import org.opendaylight.lispflowmapping.southbound.lisp.ILispSouthboundService; -import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService; -import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundService; +import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundHandler; +import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler; import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin; import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType; import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress; @@ -40,225 +45,166 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class); private static Object startLock = new Object(); - private LispIoThread lispThread; - private LispIoThread xtrThread; - private LispSouthboundService lispSouthboundService; - private LispXtrSouthboundService lispXtrSouthboundService; + private LispSouthboundHandler lispSouthboundHandler; + private LispXtrSouthboundHandler lispXtrSouthboundHandler; private NotificationPublishService notificationPublishService; private RpcProviderRegistry rpcRegistry; private BindingAwareBroker broker; - private volatile DatagramSocket socket = null; - private volatile String bindingAddress = null; + private NioDatagramChannel channel; + private volatile String bindingAddress = "0.0.0.0"; private volatile int xtrPort = LispMessage.XTR_PORT_NUM; private volatile boolean listenOnXtrPort = false; private BindingAwareBroker.RpcRegistration sbRpcRegistration; - private DatagramSocket xtrSocket; + private NioDatagramChannel xtrChannel; private LispSouthboundStats statistics = new LispSouthboundStats(); + private ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb"); + private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(0, threadFactory); + public void init() { - LOG.info("LISP (RFC6830) southbound plugin is initializing..."); + LOG.info("LISP (RFC6830) Southbound Plugin is initializing..."); final LispSouthboundRPC sbRpcHandler = new LispSouthboundRPC(this); sbRpcRegistration = rpcRegistry.addRpcImplementation(OdlLispSbService.class, sbRpcHandler); broker.registerProvider(this); synchronized (startLock) { - lispSouthboundService = new LispSouthboundService(this); - lispXtrSouthboundService = new LispXtrSouthboundService(); - lispSouthboundService.setNotificationProvider(this.notificationPublishService); - lispXtrSouthboundService.setNotificationProvider(this.notificationPublishService); - if (bindingAddress == null) { - setLispAddress("0.0.0.0"); - } - LOG.info("LISP (RFC6830) southbound plugin is up!"); - } - } + lispSouthboundHandler = new LispSouthboundHandler(this); + lispXtrSouthboundHandler = new LispXtrSouthboundHandler(); + lispSouthboundHandler.setNotificationProvider(this.notificationPublishService); + lispXtrSouthboundHandler.setNotificationProvider(this.notificationPublishService); - public void setNotificationPublishService(NotificationPublishService notificationService) { - this.notificationPublishService = notificationService; - } + start(); + startXtr(); - public void setRpcRegistryDependency(RpcProviderRegistry rpcRegistry) { - this.rpcRegistry = rpcRegistry; - } - - public void setBindingAwareBroker(BindingAwareBroker broker) { - this.broker = broker; + LOG.info("LISP (RFC6830) Southbound Plugin is up!"); + } } - private void unloadActions() { - if (lispThread != null) { - lispThread.stopRunning(); - } - lispSouthboundService = null; - lispXtrSouthboundService = null; - lispThread = null; - xtrThread = null; - bindingAddress = null; - LOG.info("LISP (RFC6830) southbound plugin is down!"); + private void start() { try { - Thread.sleep(1100); - } catch (InterruptedException e) { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(eventLoopGroup); + bootstrap.channel(NioDatagramChannel.class); + bootstrap.handler(lispSouthboundHandler); + channel = (NioDatagramChannel) bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel(); + } catch (Exception e) { + LOG.error("Failed to open main socket ", e); } } - private class LispIoThread extends Thread { - private volatile boolean shouldRun; - private volatile DatagramSocket threadSocket = null; - private volatile ILispSouthboundService service; - private volatile boolean running; - - public LispIoThread(DatagramSocket socket, ILispSouthboundService service) { - super("Lisp Thread"); - this.threadSocket = socket; - this.service = service; - shouldRun = true; - } - - @Override - public void run() { - running = true; - - int lispReceiveTimeout = 1000; - - LOG.info("LISP (RFC6830) southbound plugin is running and listening on address: " + bindingAddress - + " port: " + threadSocket.getLocalPort()); + private void startXtr() { + if (listenOnXtrPort) { try { - - threadSocket.setSoTimeout(lispReceiveTimeout); - } catch (SocketException e) { - LOG.error("Cannot open socket on UDP port " + threadSocket.getLocalPort(), e); - return; + Bootstrap xtrBootstrap = new Bootstrap(); + xtrBootstrap.group(eventLoopGroup); + xtrBootstrap.channel(NioDatagramChannel.class); + xtrBootstrap.handler(lispXtrSouthboundHandler); + xtrChannel = (NioDatagramChannel) xtrBootstrap.bind(bindingAddress, xtrPort).sync().channel(); + } catch (Exception e) { + LOG.error("Failed to open xTR socket ", e); } + } + } - while (shouldRun) { - byte[] buffer = new byte[4096]; - DatagramPacket packet = new DatagramPacket(buffer, buffer.length); - try { - threadSocket.receive(packet); - LOG.trace("Received a packet!"); - } catch (SocketTimeoutException ste) { - LOG.trace("Timed out waiting on socket", ste); - continue; - } catch (IOException e) { - LOG.warn("IO Exception while trying to recieve packet", e); - } - LOG.trace(String.format("Handling packet from {%s}:{%d} (len={%d})", packet.getAddress() - .getHostAddress(), packet.getPort(), packet.getLength())); + private void stop() { + try { + channel.close().sync(); + channel = null; + } catch (Exception e) { + LOG.error("Failed to close main socket ", e); + } + } - try { - this.service.handlePacket(packet); - } catch (Exception e) { - LOG.warn("Error while handling packet", e); - } + private void stopXtr() { + if (listenOnXtrPort) { + try { + xtrChannel.close().sync(); + xtrChannel = null; + } catch (Exception e) { + LOG.error("Failed to close xTR socket ", e); } - - threadSocket.close(); - LOG.trace("Socket closed"); - running = false; } + } - public void stopRunning() { - shouldRun = false; - } + private void restart() { + LOG.info("Reloading"); + stop(); + start(); + } - public boolean isRunning() { - return running; - } + private void restartXtr() { + LOG.info("Reloading xTR"); + stopXtr(); + startXtr(); } - public static String intToIpv4(int address) { - return ((address >> 24) & 0xff) + "." + // - ((address >> 16) & 0xff) + "." + // - ((address >> 8) & 0xff) + "." + // - ((address >> 0) & 0xff); + public void setNotificationPublishService(NotificationPublishService notificationService) { + this.notificationPublishService = notificationService; } - private void startIOThread() { - if (socket != null) { - while (!socket.isClosed()) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - } - } - } - try { - socket = new DatagramSocket(new InetSocketAddress(bindingAddress, LispMessage.PORT_NUM)); - lispThread = new LispIoThread(socket, lispSouthboundService); - lispThread.start(); - LOG.info("LISP (RFC6830) southbound plugin is listening for control packets!"); - if (listenOnXtrPort) { - restartXtrThread(); - } - } catch (SocketException e) { - LOG.error("couldn't start socket: {}", ExceptionUtils.getStackTrace(e)); - } + public void setRpcRegistryDependency(RpcProviderRegistry rpcRegistry) { + this.rpcRegistry = rpcRegistry; } - private void restartXtrThread() { - try { - stopXtrThread(); - xtrSocket = new DatagramSocket(new InetSocketAddress(bindingAddress, xtrPort)); - xtrThread = new LispIoThread(xtrSocket, lispXtrSouthboundService); - xtrThread.start(); - LOG.info("xTR southbound plugin is up!"); - } catch (SocketException e) { - LOG.warn("failed to start xtr thread: {}", ExceptionUtils.getStackTrace(e)); - } + public void setBindingAwareBroker(BindingAwareBroker broker) { + this.broker = broker; } - public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, MessageType packetType) { - DatagramPacket packet = new DatagramPacket(outBuffer.array(), outBuffer.limit()); - packet.setPort(address.getPort().getValue()); + private void unloadActions() { + lispSouthboundHandler = null; + lispXtrSouthboundHandler = null; + bindingAddress = "0.0.0.0"; + + stop(); + stopXtr(); + + LOG.info("LISP (RFC6830) Southbound Plugin is down!"); + } + + public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, + final MessageType packetType) { InetAddress ip = InetAddresses.forString(new String(address.getIpAddress().getValue())); - packet.setAddress(ip); - try { - if (LOG.isDebugEnabled()) { - LOG.trace("Sending " + packetType + " on port " + address.getPort().getValue() + " to address: " + ip); - } - socket.send(packet); - this.statistics.incrementTx(packetType.getIntValue()); - } catch (IOException e) { - LOG.warn("Failed to send " + packetType, e); - this.statistics.incrementTxErrors(); + InetSocketAddress recipient = new InetSocketAddress(ip, address.getPort().getValue()); + // the wrappedBuffer() method doesn't copy data, so this conversion shouldn't hurt performance + ByteBuf data = wrappedBuffer(outBuffer.array()); + DatagramPacket packet = new DatagramPacket(data, recipient); + LOG.debug("Sending {} on port {} to address: {}", packetType, address.getPort().getValue(), ip); + if (LOG.isTraceEnabled()) { + LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data)); } + channel.write(packet).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (future.isSuccess()) { + LOG.trace("Success"); + statistics.incrementTx(packetType.getIntValue()); + } else { + LOG.warn("Failed to send packet"); + statistics.incrementTxErrors(); + } + } + }); + channel.flush(); } public LispSouthboundStats getStats() { return statistics; } + @Override public void setLispAddress(String address) { synchronized (startLock) { - if (bindingAddress != null && bindingAddress.equals(address)) { - LOG.trace("configured lisp binding address didn't change."); + if (bindingAddress.equals(address)) { + LOG.debug("Configured LISP binding address didn't change."); } else { - String action = (bindingAddress == null ? "Setting" : "Resetting"); - LOG.trace(action + " lisp binding address to: " + address); + LOG.debug("Setting LISP binding address to {}", address); bindingAddress = address; - if (lispThread != null) { - lispThread.stopRunning(); - while (lispThread.isRunning()) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - } - } - } - stopXtrThread(); - startIOThread(); - } - } - } - - private void stopXtrThread() { - if (xtrThread != null) { - xtrThread.stopRunning(); - while (xtrThread.isRunning()) { try { - Thread.sleep(500); - } catch (InterruptedException e) { + restart(); + restartXtr(); + } catch (Exception e) { + LOG.error("Failed to set LISP binding address: ", e); } } } @@ -268,11 +214,10 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) { listenOnXtrPort = shouldListenOnXtrPort; if (listenOnXtrPort) { - LOG.debug("restarting xtr thread"); - restartXtrThread(); + restartXtr(); } else { - LOG.debug("terminating thread"); - stopXtrThread(); + LOG.info("Shutting down xTR"); + stopXtr(); } } @@ -280,13 +225,14 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl public void setXtrPort(int port) { this.xtrPort = port; if (listenOnXtrPort) { - restartXtrThread(); + restartXtr(); } } @Override public void close() throws Exception { unloadActions(); + eventLoopGroup.shutdownGracefully(); sbRpcRegistration.close(); } diff --git a/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/ILispSouthboundService.java b/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/ILispSouthboundService.java index 7cc0c8648..5d0661b6f 100644 --- a/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/ILispSouthboundService.java +++ b/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/ILispSouthboundService.java @@ -7,7 +7,7 @@ */ package org.opendaylight.lispflowmapping.southbound.lisp; -import java.net.DatagramPacket; +import io.netty.channel.socket.DatagramPacket; public interface ILispSouthboundService { diff --git a/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/LispSouthboundService.java b/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/LispSouthboundHandler.java similarity index 87% rename from mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/LispSouthboundService.java rename to mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/LispSouthboundHandler.java index 1d87f6ce3..0d7bd8ff3 100644 --- a/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/LispSouthboundService.java +++ b/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/LispSouthboundHandler.java @@ -8,7 +8,12 @@ package org.opendaylight.lispflowmapping.southbound.lisp; -import java.net.DatagramPacket; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.DatagramPacket; + import java.net.InetAddress; import java.nio.ByteBuffer; @@ -39,14 +44,15 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types. import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class LispSouthboundService implements ILispSouthboundService { +@ChannelHandler.Sharable +public class LispSouthboundHandler extends SimpleChannelInboundHandler implements ILispSouthboundService { private NotificationPublishService notificationPublishService; - protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundService.class); + protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundHandler.class); private final LispSouthboundPlugin lispSbPlugin; private LispSouthboundStats lispSbStats = null; - public LispSouthboundService(LispSouthboundPlugin lispSbPlugin) { + public LispSouthboundHandler(LispSouthboundPlugin lispSbPlugin) { this.lispSbPlugin = lispSbPlugin; if (lispSbPlugin != null) { this.lispSbStats = lispSbPlugin.getStats(); @@ -57,29 +63,28 @@ public class LispSouthboundService implements ILispSouthboundService { this.notificationPublishService = nps; } - public void handlePacket(DatagramPacket packet) { - ByteBuffer inBuffer = ByteBuffer.wrap(packet.getData(), 0, packet.getLength()); + public void handlePacket(DatagramPacket msg) { + ByteBuffer inBuffer = msg.content().nioBuffer(); int type = ByteUtil.getUnsignedByte(inBuffer, LispMessage.Pos.TYPE) >> 4; handleStats(type); Object lispType = MessageType.forValue(type); if (lispType == MessageType.EncapsulatedControlMessage) { LOG.trace("Received packet of type Encapsulated Control Message"); - handleEncapsulatedControlMessage(inBuffer, packet.getAddress()); + handleEncapsulatedControlMessage(inBuffer, msg.sender().getAddress()); } else if (lispType == MessageType.MapRequest) { LOG.trace("Received packet of type Map-Request"); - handleMapRequest(inBuffer, packet.getPort()); + handleMapRequest(inBuffer, msg.sender().getPort()); } else if (lispType == MessageType.MapRegister) { LOG.trace("Received packet of type Map-Register"); - handleMapRegister(inBuffer, packet.getAddress(), packet.getPort()); + handleMapRegister(inBuffer, msg.sender().getAddress(), msg.sender().getPort()); } else if (lispType == MessageType.MapNotify) { LOG.trace("Received packet of type Map-Notify"); - handleMapNotify(inBuffer, packet.getAddress(), packet.getPort()); + handleMapNotify(inBuffer, msg.sender().getAddress(), msg.sender().getPort()); } else if (lispType == MessageType.MapReply) { LOG.trace("Received packet of type Map-Reply"); - handleMapReply(inBuffer, packet.getAddress(), packet.getPort()); + handleMapReply(inBuffer, msg.sender().getAddress(), msg.sender().getPort()); } else { LOG.warn("Received unknown LISP control packet (type " + ((lispType != null) ? lispType : type) + ")"); - LOG.trace("Buffer: " + ByteUtil.bytesToHex(packet.getData(), packet.getLength())); } } @@ -213,4 +218,23 @@ public class LispSouthboundService implements ILispSouthboundService { } } } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { + if (LOG.isTraceEnabled()) { + LOG.trace("Received UDP packet from {}:{} with content:\n{}", msg.sender().getHostString(), + msg.sender().getPort(), ByteBufUtil.prettyHexDump(msg.content())); + } + handlePacket(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOG.error("Error on channel: " + cause, cause); + } } diff --git a/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/LispXtrSouthboundService.java b/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/LispXtrSouthboundHandler.java similarity index 82% rename from mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/LispXtrSouthboundService.java rename to mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/LispXtrSouthboundHandler.java index b2102dfe4..f17855961 100644 --- a/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/LispXtrSouthboundService.java +++ b/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/lisp/LispXtrSouthboundHandler.java @@ -8,7 +8,11 @@ package org.opendaylight.lispflowmapping.southbound.lisp; -import java.net.DatagramPacket; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.DatagramPacket; + import java.net.InetAddress; import java.nio.ByteBuffer; @@ -30,16 +34,16 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types. import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class LispXtrSouthboundService implements ILispSouthboundService { +public class LispXtrSouthboundHandler extends SimpleChannelInboundHandler implements ILispSouthboundService { private NotificationPublishService notificationPublishService; - protected static final Logger LOG = LoggerFactory.getLogger(LispXtrSouthboundService.class); + protected static final Logger LOG = LoggerFactory.getLogger(LispXtrSouthboundHandler.class); public void setNotificationProvider(NotificationPublishService nps) { this.notificationPublishService = nps; } public void handlePacket(DatagramPacket packet) { - ByteBuffer inBuffer = ByteBuffer.wrap(packet.getData(), 0, packet.getLength()); + ByteBuffer inBuffer = packet.content().nioBuffer(); Object lispType = MessageType.forValue((int) (ByteUtil.getUnsignedByte(inBuffer, LispMessage.Pos.TYPE) >> 4)); if (lispType == MessageType.MapRequest) { LOG.trace("Received packet of type MapRequest for xTR"); @@ -98,4 +102,23 @@ public class LispXtrSouthboundService implements ILispSouthboundService { LOG.warn("Notification publication interrupted!"); } } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { + if (LOG.isTraceEnabled()) { + LOG.trace("Received UDP packet from {}:{} with content:\n{}", msg.sender().getHostString(), + msg.sender().getPort(), ByteBufUtil.prettyHexDump(msg.content())); + } + handlePacket(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOG.error("Error on channel: " + cause, cause); + } } diff --git a/mappingservice/southbound/src/test/java/org/opendaylight/lispflowmapping/southbound/lisp/LispSouthboundServiceTest.java b/mappingservice/southbound/src/test/java/org/opendaylight/lispflowmapping/southbound/lisp/LispSouthboundServiceTest.java index 8956d0f26..2e557b527 100644 --- a/mappingservice/southbound/src/test/java/org/opendaylight/lispflowmapping/southbound/lisp/LispSouthboundServiceTest.java +++ b/mappingservice/southbound/src/test/java/org/opendaylight/lispflowmapping/southbound/lisp/LispSouthboundServiceTest.java @@ -8,11 +8,13 @@ package org.opendaylight.lispflowmapping.southbound.lisp; +import static io.netty.buffer.Unpooled.wrappedBuffer; +import io.netty.channel.socket.DatagramPacket; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.net.DatagramPacket; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -60,7 +62,7 @@ import org.opendaylight.yangtools.yang.binding.Notification; public class LispSouthboundServiceTest extends BaseTestCase { - private LispSouthboundService testedLispService; + private LispSouthboundHandler testedLispService; private NotificationPublishService nps; private byte[] mapRequestPacket; private byte[] mapRegisterPacket; @@ -96,7 +98,7 @@ public class LispSouthboundServiceTest extends BaseTestCase { super.before(); // mapResolver = context.mock(IMapResolver.class); // mapServer = context.mock(IMapServer.class); - testedLispService = new LispSouthboundService(null); + testedLispService = new LispSouthboundHandler(null); nps = context.mock(NotificationPublishService.class); testedLispService.setNotificationProvider(nps); lispNotificationSaver = new ValueSaverAction(); @@ -350,20 +352,19 @@ public class LispSouthboundServiceTest extends BaseTestCase { System.arraycopy(mapRegisterPacket, 0, extraDataPacket, 0, mapRegisterPacket.length); stubMapRegister(true); - DatagramPacket dp = new DatagramPacket(extraDataPacket, extraDataPacket.length); - dp.setLength(mapRegisterPacket.length); + DatagramPacket dp = new DatagramPacket(wrappedBuffer(extraDataPacket), new InetSocketAddress(0), new InetSocketAddress(0)); testedLispService.handlePacket(dp); // Check map register fields. // XXX: test // byte[] notifyResult = testedLispService.handlePacket(dp).getData(); - byte[] notifyResult = lastMapNotifyPacket().getData(); + byte[] notifyResult = lastMapNotifyPacket().content().array(); assertEquals(mapRegisterPacket.length, notifyResult.length); } private DatagramPacket lastMapReplyPacket() { ByteBuffer serialize = MapReplySerializer.getInstance().serialize(mapReplyBuilder.build()); - return new DatagramPacket(serialize.array(), serialize.array().length); + return new DatagramPacket(wrappedBuffer(serialize), new InetSocketAddress(0), new InetSocketAddress(0)); } private DatagramPacket lastMapNotifyPacket() { @@ -376,7 +377,7 @@ public class LispSouthboundServiceTest extends BaseTestCase { mapNotifyBuilder.setKeyId((short) 0); mapNotifyBuilder.setAuthenticationData(new byte[0]); ByteBuffer serialize = MapNotifySerializer.getInstance().serialize(mapNotifyBuilder.build()); - return new DatagramPacket(serialize.array(), serialize.array().length); + return new DatagramPacket(wrappedBuffer(serialize), new InetSocketAddress(0), new InetSocketAddress(0)); } @Test @@ -409,7 +410,7 @@ public class LispSouthboundServiceTest extends BaseTestCase { stubMapRegister(true); DatagramPacket notifyPacket = handleMapRegisterPacket(mapRegisterPacket); - assertEquals(LispMessage.PORT_NUM, notifyPacket.getPort()); + assertEquals(LispMessage.PORT_NUM, notifyPacket.recipient().getPort()); } @Test @@ -482,7 +483,7 @@ public class LispSouthboundServiceTest extends BaseTestCase { // ret(mapReply); DatagramPacket replyPacket = handleMapRequestPacket(mapRequestPacket); - assertEquals(4342, replyPacket.getPort()); + assertEquals(4342, replyPacket.recipient().getPort()); } @Test @@ -578,7 +579,7 @@ public class LispSouthboundServiceTest extends BaseTestCase { public void mapReply__UseEncapsulatedUdpPort() throws Exception { stubHandleRequest(); - assertEquals(LispMessage.PORT_NUM, handleMapRequestPacket(mapRequestPacket).getPort()); + assertEquals(LispMessage.PORT_NUM, handleMapRequestPacket(mapRequestPacket).recipient().getPort()); } @Test @@ -705,28 +706,28 @@ public class LispSouthboundServiceTest extends BaseTestCase { private byte[] handleMapRequestAsByteArray(byte[] inPacket) { handleMapRequestPacket(inPacket); - return lastMapReplyPacket().getData(); + return lastMapReplyPacket().content().array(); } private byte[] handleMapRegisterAsByteArray(byte[] inPacket) { handleMapRegisterPacket(inPacket); - return lastMapNotifyPacket().getData(); + return lastMapNotifyPacket().content().array(); } private DatagramPacket handleMapRequestPacket(byte[] inPacket) { - DatagramPacket dp = new DatagramPacket(inPacket, inPacket.length); + DatagramPacket dp = new DatagramPacket(wrappedBuffer(inPacket), new InetSocketAddress(0), new InetSocketAddress(0)); // Unless we explicitly set the source port, it will be -1, which breaks some tests // This is till not the real port number, but it's better - dp.setPort(LispMessage.PORT_NUM); + //dp.setPort(LispMessage.PORT_NUM); testedLispService.handlePacket(dp); return lastMapReplyPacket(); } private DatagramPacket handleMapRegisterPacket(byte[] inPacket) { - DatagramPacket dp = new DatagramPacket(inPacket, inPacket.length); + DatagramPacket dp = new DatagramPacket(wrappedBuffer(inPacket), new InetSocketAddress(0), new InetSocketAddress(0)); // Unless we explicitly set the source port, it will be -1, which breaks some tests // This is till not the real port number, but it's better - dp.setPort(LispMessage.PORT_NUM); + //dp.setPort(LispMessage.PORT_NUM); testedLispService.handlePacket(dp); if (mapNotifyBuilder == null) { return null; @@ -737,7 +738,7 @@ public class LispSouthboundServiceTest extends BaseTestCase { private DatagramPacket handlePacket(byte[] inPacket) { // TODO get from mock - testedLispService.handlePacket(new DatagramPacket(inPacket, inPacket.length)); + testedLispService.handlePacket(new DatagramPacket(wrappedBuffer(inPacket), new InetSocketAddress(0), new InetSocketAddress(0))); return null; } -- 2.36.6